""" Copyright Xplainable Pty Ltd, 2023"""
import json
import numpy as np
import pandas as pd
import pyperclip
import time
from IPython.display import clear_output, display
from .._dependencies import _check_ipywidgets
from ..utils.api import get_response_content
from ..utils.encoders import NpEncoder
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from ..gui.screens.preprocessor import Preprocessor
from ..preprocessing import transformers as xtf
from ..utils.exceptions import AuthenticationError
from ..quality.scanner import XScan
from ..metrics.metrics import evaluate_classification, evaluate_regression
from ..core.models import (XClassifier, XRegressor, PartitionedRegressor,
PartitionedClassifier)
from ..config import OUTPUT_TYPE
[docs]class Client:
""" A client for interfacing with the xplainable web api (xplainable cloud).
Access models, preprocessors and user data from xplainable cloud. API keys
can be generated at https://app.xplainable.io.
Args:
api_key (str): A valid api key.
"""
def __init__(self, api_key, hostname='https://api.xplainable.io'):
self.__api_key = api_key
self.hostname = hostname
self.machines = {}
self.__session__ = requests.Session()
self._user = None
self.avatar = None
self._init()
def _init(self):
""" Authorize access to xplainable API.
Active API Key is required for authorization.
Raises:
HTTPError: If user not authorized.
"""
# Add token to session headers
self.__session__.headers['api_key'] = self.__api_key
# Configure retry strategy
RETRY_STRATEGY = Retry(
total=5,
backoff_factor=1
)
# Mount strategy
ADAPTER = HTTPAdapter(max_retries=RETRY_STRATEGY)
self.__session__.mount(self.hostname, ADAPTER)
session_data = self.get_user_data()
self.__org_id = session_data.pop('organisation_id')
self.__team_id = session_data.pop('team_id')
self.__ext = f'organisations/{self.__org_id}/teams/{self.__team_id}'
self._user = session_data
try:
import ipywidgets
from ..gui.components.cards import render_user_avatar
self.avatar = render_user_avatar(self._user)
except ImportError:
pass
self.xplainable_version = None
self.python_version = None
[docs] def list_models(self) -> list:
""" Lists all models of the active user's team.
Returns:
dict: Dictionary of saved models.
"""
response = self.__session__.get(
url=f'{self.hostname}/v1/{self.__ext}/models'
)
data = get_response_content(response)
[i.pop('user') for i in data]
return data
[docs] def list_model_versions(self, model_id: int) -> list:
""" Lists all versions of a model.
Args:
model_id (int): The model id
Returns:
dict: Dictionary of model versions.
"""
response = self.__session__.get(
url=f'{self.hostname}/v1/{self.__ext}/models/{model_id}/versions'
)
data = get_response_content(response)
[i.pop('user') for i in data]
return data
[docs] def list_preprocessors(self) -> list:
""" Lists all preprocessors of the active user's team.
Returns:
dict: Dictionary of preprocessors.
"""
response = self.__session__.get(
url=f'{self.hostname}/v1/{self.__ext}/preprocessors'
)
data = get_response_content(response)
[i.pop('user') for i in data]
return data
[docs] def list_preprocessor_versions(self, preprocessor_id: int) -> list:
""" Lists all versions of a preprocessor.
Args:
preprocessor_id (int): The preprocessor id
Returns:
dict: Dictionary of preprocessor versions.
"""
response = self.__session__.get(
url=f'{self.hostname}/v1/{self.__ext}/preprocessors/{preprocessor_id}/versions'
)
data = get_response_content(response)
[i.pop('user') for i in data]
return data
[docs] def load_preprocessor(
self, preprocessor_id: int, version_id: int,
response_only: bool = False):
""" Loads a preprocessor by preprocessor_id and version_id.
Args:
preprocessor_id (int): The preprocessor id
version_id (int): The version id
response_only (bool, optional): Returns the preprocessor metadata.
Returns:
xplainable.preprocessing.Preprocessor: The loaded preprocessor
"""
def build_transformer(stage):
"""Build transformer from metadata"""
if not hasattr(xtf, stage["name"]):
raise ValueError(f"{stage['name']} does not exist in the transformers module")
# Get transformer function
func = getattr(xtf, stage["name"])
return func(**stage['params'])
try:
preprocessor_response = self.__session__.get(
url=f'{self.hostname}/v1/{self.__ext}/preprocessors/{preprocessor_id}/versions/{version_id}'
)
response = get_response_content(preprocessor_response)
if response_only:
return response
stages = response['stages']
deltas = response['deltas']
except Exception as e:
raise ValueError(
f'Preprocessor with ID {preprocessor_id}:{version_id} does not exist')
xp = Preprocessor()
xp.pipeline.stages = [{"feature": i["feature"], "name": i["name"], \
"transformer": build_transformer(i)} for i in stages]
xp.df_delta = deltas
xp.state = len(xp.pipeline.stages)
return xp
[docs] def load_classifier(self, model_id: int, version_id: int, model=None):
""" Loads a binary classification model by model_id
Args:
model_id (str): A valid model_id
version_id (str): A valid version_id
model (PartitionedClassifier): An existing model to add partitions
Returns:
xplainable.PartitionedClassifier: The loaded xplainable classifier
"""
response = self.__get_model__(model_id, version_id)
if response['model_type'] != 'binary_classification':
raise ValueError(f'Model with ID {model_id}:{version_id} is not a binary classification model')
if model is None:
partitioned_model = PartitionedClassifier(response['partition_on'])
else:
partitioned_model = model
for p in response['partitions']:
model = XClassifier()
model._profile = np.array([
np.array(i) for i in json.loads(p['profile'])], dtype=object)
model._calibration_map = p['calibration_map']
model._support_map = p['support_map']
model.base_value = p['base_value']
model.target_map = p['target_map']
model.feature_map = p['feature_map']
model.feature_map_inv = {k: {v: k2 for k2, v in v.items()} for \
k, v in p['feature_map'].items()}
model.columns = p['columns']
model.id_columns = p['id_columns']
model.categorical_columns = p['feature_map'].keys()
model.numeric_columns = [c for c in model.columns if c not \
in model.categorical_columns]
model.category_meta = {
i: {ii: {int(float(k)): v for k, v in vv.items()} for ii, vv \
in v.items()} for i, v in p['category_meta'].items()}
partitioned_model.add_partition(model, p['partition'])
return partitioned_model
[docs] def load_regressor(self, model_id: int, version_id: int, model=None):
""" Loads a regression model by model_id and version_id
Args:
model_id (str): A valid model_id
version_id (str): A valid version_id
model (PartitionedRegressor): An existing model to add partitions to
Returns:
xplainable.PartitionedRegressor: The loaded xplainable regressor
"""
response = self.__get_model__(model_id, version_id)
if response['model_type'] != 'regression':
raise ValueError(f'Model with ID {model_id}:{version_id} is not a regression model')
if model is None:
partitioned_model = PartitionedRegressor(response['partition_on'])
else:
partitioned_model = model
for p in response['partitions']:
model = XRegressor()
model._profile = np.array([
np.array(i) for i in json.loads(p['profile'])])
model.base_value = p['base_value']
model.target_map = p['target_map']
model.feature_map = p['feature_map']
model.feature_map_inv = {k: {v: k2 for k2, v in v.items()} for \
k, v in p['feature_map'].items()}
model.columns = p['columns']
model.id_columns = p['id_columns']
model.categorical_columns = p['feature_map'].keys()
model.numeric_columns = [c for c in model.columns if c \
not in model.categorical_columns]
model.category_meta = {
i: {ii: {int(float(k)): v for k, v in vv.items()} for ii, vv \
in v.items()} for i, v in p['category_meta'].items()}
partitioned_model.add_partition(model, p['partition'])
return partitioned_model
def __get_model__(self, model_id: int, version_id: int):
try:
response = self.__session__.get(
url=f'{self.hostname}/v1/{self.__ext}/models/{model_id}/versions/{version_id}'
)
return get_response_content(response)
except Exception as e:
raise ValueError(
f'Model with ID {model_id}:{version_id} does not exist')
[docs] def get_user_data(self) -> dict:
""" Retrieves the user data for the active user.
Returns:
dict: User data
"""
response = self.__session__.get(
url=f'{self.hostname}/v1/client-connect'
)
if response.status_code == 200:
return get_response_content(response)
else:
raise AuthenticationError("API key has expired or is invalid.")
[docs] def create_preprocessor_id(
self, preprocessor_name: str, preprocessor_description: str) -> str:
""" Creates a new preprocessor and returns the preprocessor id.
Args:
preprocessor_name (str): The name of the preprocessor
preprocessor_description (str): The description of the preprocessor
Returns:
int: The preprocessor id
"""
payoad = {
"preprocessor_name": preprocessor_name,
"preprocessor_description": preprocessor_description
}
response = self.__session__.post(
url=f'{self.hostname}/v1/{self.__ext}/create-preprocessor',
json=payoad
)
preprocessor_id = get_response_content(response)
return preprocessor_id
[docs] def create_preprocessor_version(
self, preprocessor_id: str, preprocessor) -> str:
""" Creates a new preprocessor version and returns the version id.
Args:
preprocessor_id (int): The preprocessor id
stages (dict): The preprocessor stages
deltas (dict): The preprocessor deltas
versions (dict): Versions of current environment
Returns:
int: The preprocessor version id
"""
# Structure the stages and deltas
stages = []
for stage in preprocessor.pipeline.stages:
step = {
'feature': stage['feature'],
'name': stage['name'],
'params': stage['transformer'].__dict__
}
stages.append(step)
deltas = preprocessor.df_delta
# Get current versions
versions = {
"xplainable_version": self.xplainable_version,
"python_version": self.python_version
}
# Create payload
payload = {
"stages": stages,
"deltas": deltas,
"versions": versions
}
# Create a new version and fetch id
url = (
f'{self.hostname}/v1/{self.__ext}/preprocessors/'
f'{preprocessor_id}/add-version'
)
response = self.__session__.post(url=url, json=payload)
version_id = get_response_content(response)
return version_id
def _detect_model_type(self, model):
if 'Partitioned' in model.__class__.__name__:
model = model.partitions['__dataset__']
cls_name = model.__class__.__name__
if cls_name == "XClassifier":
model_type = "binary_classification"
elif cls_name == "XRegressor":
model_type = "regression"
else:
raise ValueError(
f'Model type {cls_name} is not supported')
return model_type, model.target
[docs] def create_model_id(
self, model, model_name: str, model_description: str) -> str:
""" Creates a new model and returns the model id.
Args:
model_name (str): The name of the model
model_description (str): The description of the model
model (XClassifier | XRegressor): The model to create.
Returns:
int: The model id
"""
model_type, target = self._detect_model_type(model)
payoad = {
"model_name": model_name,
"model_description": model_description,
"model_type": model_type,
"target_name": target,
"algorithm": model.__class__.__name__
}
response = self.__session__.post(
url=f'{self.hostname}/v1/{self.__ext}/create-model',
json=payoad
)
model_id = get_response_content(response)
return model_id
[docs] def create_model_version(
self, model, model_id: str, x: pd.DataFrame, y: pd.Series) -> str:
""" Creates a new model version and returns the version id.
Args:
model_id (int): The model id
partition_on (str): The partition column name
ruleset (dict | str): The feeature ruleset
health_info (dict): Feature health information
versions (dict): Versions of current environment
Returns:
int: The model version id
"""
# ruleset = generate_ruleset(
# self.df,
# self.model.partitions['__dataset__'].target,
# self.model.partitions['__dataset__'].id_columns
# )
# Get current versions
versions = {
"xplainable_version": self.xplainable_version,
"python_version": self.python_version
}
partition_on = model.partition_on if 'Partitioned' in \
model.__class__.__name__ else None
payload = {
"partition_on": partition_on,
"versions": versions,
"partitions": []
}
partitioned_models = ['PartitionedClassifier', 'PartitionedRegressor']
independent_models = ['XClassifier', 'XRegressor']
# get all partitions
if model.__class__.__name__ in partitioned_models:
for p, m in model.partitions.items():
if p == '__dataset__':
part_x = x
part_y = y
else:
part_x = x[x[partition_on].astype(str) == str(p)]
part_y = y[y.index.isin(part_x.index)]
pdata = self._get_partition_data(m, p, part_x, part_y)
payload['partitions'].append(pdata)
elif model.__class__.__name__ in independent_models:
pdata = self._get_partition_data(model, '__dataset__', x, y)
payload['partitions'].append(pdata)
# Create a new version and fetch id
url = f'{self.hostname}/v1/{self.__ext}/models/{model_id}/add-version'
response = self.__session__.post(url=url,json=payload)
version_id = get_response_content(response)
return version_id
def _get_partition_data(
self, model, partition_name: str, x: pd.DataFrame,
y: pd.Series) -> dict:
""" Logs a partition to a model version.
Args:
model_type (str): The model type
partition_name (str): The name of the partition column
model (mixed): The model to log
model_id (int): The model id
version_id (int): The version id
evaluation (dict, optional): Model evaluation data and metrics.
training_metadata (dict, optional): Model training metadata.
"""
model_type, _ = self._detect_model_type(model)
data = {
"partition": str(partition_name),
"profile": json.dumps(model._profile, cls=NpEncoder),
"feature_importances": json.loads(
json.dumps(model.feature_importances, cls=NpEncoder)),
"id_columns": json.loads(
json.dumps(model.id_columns, cls=NpEncoder)),
"columns": json.loads(
json.dumps(model.columns, cls=NpEncoder)),
"target_map": json.loads(
json.dumps(model.target_map_inv, cls=NpEncoder)),
"parameters": json.loads(
json.dumps(model.params, cls=NpEncoder)),
"base_value": json.loads(
json.dumps(model.base_value, cls=NpEncoder)),
"feature_map": json.loads(
json.dumps(model.feature_map, cls=NpEncoder)),
"category_meta": json.loads(
json.dumps(model.category_meta, cls=NpEncoder)),
"calibration_map": None,
"support_map": None
}
if model_type == 'binary_classification':
data.update({
"calibration_map": json.loads(
json.dumps(model._calibration_map, cls=NpEncoder)),
"support_map": json.loads(
json.dumps(model._support_map, cls=NpEncoder))
})
evaluation = model.metadata.get('evaluation', {})
if evaluation == {}:
y_prob = model.predict_score(x)
if model.target_map:
y = y.map(model.target_map)
evaluation = {
'train': evaluate_classification(y, y_prob)
}
elif model_type == 'regression':
evaluation = model.metadata.get('evaluation', {})
if evaluation == {}:
y_pred = model.predict(x)
evaluation = {
'train': evaluate_regression(y, y_pred)
}
data["evaluation"] = json.dumps(evaluation, cls=NpEncoder)
training_metadata = {
i: v for i, v in model.metadata.items() if i != "evaluation"}
data["training_metadata"] = json.dumps(training_metadata, cls=NpEncoder)
if x is not None:
scanner = XScan()
scanner.scan(x)
results = []
for i, v in scanner.profile.items():
feature_info = {
"feature": i,
"description": '',
"type": v['type'],
"health_info": json.loads(json.dumps(v, cls=NpEncoder))
}
results.append(feature_info)
data["health_info"] = json.dumps(results, cls=NpEncoder)
return data
[docs] def deploy(
self, hostname: None, model_id: str, version_id: str,
partition_id: str, raw_output: bool=True) -> dict:
""" Deploys a model partition to xplainable cloud.
The hostname should be the url of the inference server. For example:
https://inference.xplainable.io
Args:
hostname (str): The host name for the inference server
model_id (int): The model id
version_id (int): The version id
partition_id (int): The partition id
raw_output (bool, optional): returns a dictionary
Returns:
dict: deployment status and details.
"""
if hostname is None:
hostname = self.hostname
url = (
f'{hostname}/v1/{self.__ext}/models/{model_id}/versions/'
f'{version_id}/partitions/{partition_id}/deploy'
)
response = self.__session__.put(url)
if response.status_code == 200:
deployment_id = response.json()['deployment_id']
data = {
"deployment_id": deployment_id,
"status": "active",
"location": "sydney",
"endpoint": f"{hostname}/v1/predict"
}
if raw_output or OUTPUT_TYPE == 'raw':
return data
widgets = _check_ipywidgets()
from ..gui.components import KeyValueTable
table = KeyValueTable(
data,
transpose=False,
padding="0px 20px 0px 5px",
table_width='auto',
header_color='#e8e8e8',
border_color='#dddddd',
header_font_color='#20252d',
cell_font_color= '#374151'
)
def on_click(b):
try:
self.generate_deploy_key(
description='generated by python client',
deployment_id=deployment_id,
surpress_output=True
)
b.description = "Copied to clipboard!"
b.disabled = True
except Exception as e:
b.description = "Failed. Try Again."
b.disabled = True
time.sleep(2)
b.description = "Generate Deploy Key"
b.disabled = False
button = widgets.Button(description="Generate Deploy Key")
button.on_click(on_click)
output = widgets.HBox([table.html_widget, button])
display(output)
else:
return {"message": f"Failed with status code {response.status_code}"}
[docs] def generate_deploy_key(
self, description: str, deployment_id: str,
days_until_expiry: float = 90, surpress_output: bool = False
) -> None:
""" Generates a deploy key for a model deployment.
Args:
description (str): Description of the deploy key use case.
deployment_id (str): The deployment id.
days_until_expiry (float): The number of days until the key expires.
surpress_output (bool): Surpress output. Defaults to False.
Returns:
None: No key is returned. The key is copied to the clipboard.
"""
url = f'{self.hostname}/v1/{self.__ext}/deployments/{deployment_id}/create-deploy-key'
params = {
'description': description,
'days_until_expiry': days_until_expiry
}
response = self.__session__.put(
url=url,
json=params
)
deploy_key = response.json()
if deploy_key:
pyperclip.copy(deploy_key)
if not surpress_output:
print("Deploy key copied to clipboard!")
time.sleep(2)
clear_output()
else:
return response.status_code
def _gpt_report(
self, model_id, version_id, target_info='', other_details=''):
url = (
f'{self.hostname}/v1/{self.__ext}/models/{model_id}/versions/'
f'{version_id}/generate-report'
)
params = {
'target_info': target_info,
'other_details': other_details
}
response = self.__session__.get(
url=url,
params=params
)
if response.status_code == 200:
return response.content
else:
return response.status_code