""" Copyright Xplainable Pty Ltd, 2023"""
import json
import numpy as np
import pandas as pd
import pyperclip
import time
import inspect
import ast
from ..utils.api import get_response_content
from ..utils.encoders import NpEncoder, force_json_compliant
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from ..preprocessing.pipeline import XPipeline
from ..preprocessing import transformers as xtf
from ..utils.model_parsers import *
from ..utils.exceptions import AuthenticationError
from ..utils.helpers import get_df_delta
from ..quality.scanner import XScan
from ..metrics.metrics import evaluate_classification, evaluate_regression
from ..core.models import (XClassifier, XRegressor, PartitionedRegressor, PartitionedClassifier, ConstructorParams)
from ..config import OUTPUT_TYPE
[docs]class Client:
""" A client for interfacing with the xplainable web api (xplainable cloud).
Access models, preprocessors and user data from xplainable cloud. API keys
can be generated at https://beta.xplainable.io.
Args:
api_key (str): A valid api key.
"""
def __init__(self, api_key, hostname='https://api.xplainable.io'):
self.__api_key = api_key
self.hostname = hostname
self.machines = {}
self.__session = requests.Session()
self._user = None
self.avatar = None
self._init()
def _init(self):
""" Authorize access to xplainable API.
Active API Key is required for authorization.
Raises:
HTTPError: If user not authorized.
"""
# Add token to session headers
self.__session.headers['api_key'] = self.__api_key
# Configure retry strategy
RETRY_STRATEGY = Retry(
total=5,
backoff_factor=1
)
# Mount strategy
ADAPTER = HTTPAdapter(max_retries=RETRY_STRATEGY)
self.__session.mount(self.hostname, ADAPTER)
session_data = self.get_user_data()
self.__org_id = session_data.pop('organisation_id')
self.__team_id = session_data.pop('team_id')
self.__ext = f'organisations/{self.__org_id}/teams/{self.__team_id}'
self._user = session_data
try:
import ipywidgets
from ..gui.components.cards import render_user_avatar
self.avatar = render_user_avatar(self._user)
except ImportError:
pass
self.xplainable_version = None
self.python_version = None
[docs] def list_models(self) -> list:
""" Lists all models of the active user's team.
Returns:
dict: Dictionary of saved models.
"""
response = self.__session.get(
url=f'{self.hostname}/v1/{self.__ext}/models'
)
data = get_response_content(response)
# For better readability
[i.pop('user') for i in data]
[i.pop('contributors') for i in data]
return data
[docs] def list_model_versions(self, model_id: int) -> list:
""" Lists all versions of a model.
Args:
model_id (int): The model id
Returns:
dict: Dictionary of model versions.
"""
response = self.__session.get(
url=f'{self.hostname}/v1/{self.__ext}/models/{model_id}/versions'
)
data = get_response_content(response)
[i.pop('user') for i in data]
return data
[docs] def list_preprocessors(self) -> list:
""" Lists all preprocessors of the active user's team.
Returns:
dict: Dictionary of preprocessors.
"""
response = self.__session.get(
url=f'{self.hostname}/v1/{self.__ext}/preprocessors'
)
data = get_response_content(response)
[i.pop('user') for i in data]
return data
[docs] def list_preprocessor_versions(self, preprocessor_id: int) -> list:
""" Lists all versions of a preprocessor.
Args:
preprocessor_id (int): The preprocessor id
Returns:
dict: Dictionary of preprocessor versions.
"""
response = self.__session.get(
url=f'{self.hostname}/v1/{self.__ext}/preprocessors/{preprocessor_id}/versions'
)
data = get_response_content(response)
[i.pop('user') for i in data]
return data
[docs] def load_preprocessor(
self, preprocessor_id: int, version_id: int,
gui_object: bool = False, response_only: bool = False):
""" Loads a preprocessor by preprocessor_id and version_id.
Args:
preprocessor_id (int): The preprocessor id
version_id (int): The version id
response_only (bool, optional): Returns the preprocessor metadata.
Returns:
xplainable.preprocessing.pipeline.Pipeline: The loaded pipeline
"""
def build_transformer(stage):
"""Build transformer from metadata"""
if not hasattr(xtf, stage["name"]):
raise ValueError(f"{stage['name']} does not exist in the transformers module")
# Get transformer function
func = getattr(xtf, stage["name"])
return func(**stage['params'])
try:
preprocessor_response = self.__session.get(
url=f'{self.hostname}/v1/{self.__ext}/preprocessors/{preprocessor_id}/versions/{version_id}'
)
response = get_response_content(preprocessor_response)
if response_only:
return response
except Exception as e:
raise ValueError(
f'Preprocessor with ID {preprocessor_id}:{version_id} does not exist')
stages = response['stages']
deltas = response['deltas']
pipeline = XPipeline()
pipeline.stages = [{"feature": i["feature"], "name": i["name"], \
"transformer": build_transformer(i)} for i in stages]
if not gui_object:
return pipeline
else:
from ..gui.screens.preprocessor import Preprocessor
pp = Preprocessor()
pp.pipeline = pipeline
pp.df_delta = deltas
pp.state = len(pipeline.stages)
return pp
[docs] def load_classifier(self, model_id: int, version_id: int, model=None):
""" Loads a binary classification model by model_id
Args:
model_id (str): A valid model_id
version_id (str): A valid version_id
model (PartitionedClassifier): An existing model to add partitions
Returns:
xplainable.PartitionedClassifier: The loaded xplainable classifier
"""
response = self.__get_model__(model_id, version_id)
if response['model_type'] != 'binary_classification':
raise ValueError(f'Model with ID {model_id}:{version_id} is not a binary classification model')
return parse_classifier_response(response, model)
[docs] def load_regressor(self, model_id: int, version_id: int, model=None):
""" Loads a regression model by model_id and version_id
Args:
model_id (str): A valid model_id
version_id (str): A valid version_id
model (PartitionedRegressor): An existing model to add partitions to
Returns:
xplainable.PartitionedRegressor: The loaded xplainable regressor
"""
response = self.__get_model__(model_id, version_id)
if response['model_type'] != 'regression':
raise ValueError(f'Model with ID {model_id}:{version_id} is not a regression model')
return parse_regressor_response(response, model)
def __get_model__(self, model_id: int, version_id: int):
try:
response = self.__session.get(
url=f'{self.hostname}/v1/{self.__ext}/models/{model_id}/versions/{version_id}'
)
return get_response_content(response)
except Exception as e:
raise ValueError(f'Model with ID {model_id}:{version_id} does not exist')
[docs] def get_user_data(self) -> dict:
""" Retrieves the user data for the active user.
Returns:
dict: User data
"""
response = self.__session.get(
url=f'{self.hostname}/v1/client-connect'
)
if response.status_code == 200:
return get_response_content(response)
else:
raise AuthenticationError(
f"{response.status_code} Unauthenticated. "
f"{response.json()['detail']}"
)
[docs] def create_preprocessor_id(
self, preprocessor_name: str, preprocessor_description: str) -> str:
""" Creates a new preprocessor and returns the preprocessor id.
Args:
preprocessor_name (str): The name of the preprocessor
preprocessor_description (str): The description of the preprocessor
Returns:
int: The preprocessor id
"""
payoad = {
"preprocessor_name": preprocessor_name,
"preprocessor_description": preprocessor_description
}
response = self.__session.post(
url=f'{self.hostname}/v1/{self.__ext}/create-preprocessor',
json=payoad
)
preprocessor_id = get_response_content(response)
return preprocessor_id
[docs] def create_preprocessor_version(
self, preprocessor_id: str, pipeline: list, df: pd.DataFrame = None
) -> str:
""" Creates a new preprocessor version and returns the version id.
Args:
preprocessor_id (int): The preprocessor id
pipeline (xplainable.preprocessing.pipeline.Pipeline): pipeline
Returns:
int: The preprocessor version id
"""
# Structure the stages and deltas
stages = []
deltas = []
if df is not None:
before = df.copy()
deltas.append({"start": json.loads(before.head(10).to_json(
orient='records'))})
delta_gen = pipeline.transform_generator(before)
for stage in pipeline.stages:
step = {
'feature': stage['feature'],
'name': stage['name'],
'params': stage['transformer'].__dict__
}
stages.append(step)
if df is not None:
after = delta_gen.__next__()
delta = get_df_delta(before.copy(), after.copy())
deltas.append(delta)
before = after.copy()
# Get current versions
versions = {
"xplainable_version": self.xplainable_version,
"python_version": self.python_version
}
# Create payload
payload = {
"stages": stages,
"deltas": deltas,
"versions": versions
}
# Create a new version and fetch id
url = (
f'{self.hostname}/v1/{self.__ext}/preprocessors/'
f'{preprocessor_id}/add-version'
)
response = self.__session.post(url=url, json=payload)
version_id = get_response_content(response)
return version_id
def _detect_model_type(self, model):
if 'Partitioned' in model.__class__.__name__:
model = model.partitions['__dataset__']
cls_name = model.__class__.__name__
if cls_name == "XClassifier":
model_type = "binary_classification"
elif cls_name == "XRegressor":
model_type = "regression"
else:
raise ValueError(
f'Model type {cls_name} is not supported')
return model_type, model.target
[docs] def create_model_id(
self, model, model_name: str, model_description: str) -> str:
""" Creates a new model and returns the model id.
Args:
model_name (str): The name of the model
model_description (str): The description of the model
model (XClassifier | XRegressor): The model to create.
Returns:
int: The model id
"""
model_type, target = self._detect_model_type(model)
payoad = {
"model_name": model_name,
"model_description": model_description,
"model_type": model_type,
"target_name": target,
"algorithm": model.__class__.__name__
}
response = self.__session.post(
url=f'{self.hostname}/v1/{self.__ext}/create-model',
json=payoad
)
model_id = get_response_content(response)
return model_id
[docs] def create_model_version(
self, model, model_id: str, x: pd.DataFrame, y: pd.Series) -> str:
""" Creates a new model version and returns the version id.
Args:
model_id (int): The model id
partition_on (str): The partition column name
ruleset (dict | str): The feeature ruleset
health_info (dict): Feature health information
versions (dict): Versions of current environment
Returns:
int: The model version id
"""
# ruleset = generate_ruleset(
# self.df,
# self.model.partitions['__dataset__'].target,
# self.model.partitions['__dataset__'].id_columns
# )
# Get current versions
versions = {
"xplainable_version": self.xplainable_version,
"python_version": self.python_version
}
partition_on = model.partition_on if 'Partitioned' in \
model.__class__.__name__ else None
payload = {
"partition_on": partition_on,
"versions": versions,
"partitions": []
}
partitioned_models = ['PartitionedClassifier', 'PartitionedRegressor']
independent_models = ['XClassifier', 'XRegressor']
# get all partitions
if model.__class__.__name__ in partitioned_models:
for p, m in model.partitions.items():
if p == '__dataset__':
part_x = x
part_y = y
else:
part_x = x[x[partition_on].astype(str) == str(p)]
part_y = y[y.index.isin(part_x.index)]
pdata = self._get_partition_data(m, p, part_x, part_y)
payload['partitions'].append(pdata)
elif model.__class__.__name__ in independent_models:
pdata = self._get_partition_data(model, '__dataset__', x, y)
payload['partitions'].append(pdata)
# Create a new version and fetch id
url = f'{self.hostname}/v1/{self.__ext}/models/{model_id}/add-version'
response = self.__session.post(
url=url, json=force_json_compliant(payload))
version_id = get_response_content(response)
return version_id
def _get_partition_data(
self, model, partition_name: str, x: pd.DataFrame,
y: pd.Series) -> dict:
""" Logs a partition to a model version.
Args:
model_type (str): The model type
partition_name (str): The name of the partition column
model (mixed): The model to log
model_id (int): The model id
version_id (int): The version id
evaluation (dict, optional): Model evaluation data and metrics.
training_metadata (dict, optional): Model training metadata.
"""
model_type, _ = self._detect_model_type(model)
data = {
"partition": str(partition_name),
"profile": json.dumps(model._profile, cls=NpEncoder),
"feature_importances": json.loads(
json.dumps(model.feature_importances, cls=NpEncoder)),
"id_columns": json.loads(
json.dumps(model.id_columns, cls=NpEncoder)),
"columns": json.loads(
json.dumps(model.columns, cls=NpEncoder)),
"parameters": model.params.to_json(),
"base_value": json.loads(
json.dumps(model.base_value, cls=NpEncoder)),
"feature_map": json.loads(
json.dumps({k: fm.forward for k, fm in model.feature_map.items()}, cls=NpEncoder)),
"target_map": json.loads(
json.dumps(model.target_map.reverse, cls=NpEncoder)),
"category_meta": json.loads(
json.dumps(model.category_meta, cls=NpEncoder)),
# "constructs": model.constructs_to_json(),
"calibration_map": None,
"support_map": None
}
if model_type == 'binary_classification':
data.update({
"calibration_map": json.loads(
json.dumps(model._calibration_map, cls=NpEncoder)),
"support_map": json.loads(
json.dumps(model._support_map, cls=NpEncoder))
})
evaluation = model.metadata.get('evaluation', {})
if evaluation == {}:
y_prob = model.predict_score(x)
if model.target_map:
y = y.map(model.target_map)
evaluation = {
'train': evaluate_classification(y, y_prob)
}
elif model_type == 'regression':
evaluation = model.metadata.get('evaluation', {})
if evaluation == {}:
y_pred = model.predict(x)
evaluation = {
'train': evaluate_regression(y, y_pred)
}
data["evaluation"] = json.dumps(evaluation, cls=NpEncoder)
training_metadata = {
i: v for i, v in model.metadata.items() if i != "evaluation"}
data["training_metadata"] = json.dumps(training_metadata, cls=NpEncoder)
if x is not None:
scanner = XScan()
scanner.scan(x)
results = []
for i, v in scanner.profile.items():
feature_info = {
"feature": i,
"description": '',
"type": v['type'],
"health_info": json.loads(json.dumps(v, cls=NpEncoder))
}
results.append(feature_info)
data["health_info"] = json.dumps(results, cls=NpEncoder)
return data
[docs] def list_deployments(self):
""" Lists all deployments of the active user's team.
Returns:
dict: Dictionary of deployments.
"""
response = self.__session.get(
url=f'{self.hostname}/v1/{self.__ext}/deployments'
)
deployments = get_response_content(response)
return deployments
[docs] def deploy(
self, model_id: str, version_id: str,
hostname: str='https://inference.xplainable.io',
location: str='syd', raw_output: bool=True) -> dict:
""" Deploys a model partition to xplainable cloud.
The hostname should be the url of the inference server. For example:
https://inference.xplainable.io
Args:
hostname (str): The host name for the inference server
model_id (int): The model id
version_id (int): The version id
partition_id (int): The partition id
raw_output (bool, optional): returns a dictionary
Returns:
dict: deployment status and details.
"""
url = (
f'{self.hostname}/v1/{self.__ext}/models/{model_id}/versions/'
f'{version_id}/deploy'
)
body = {
"location": location
}
response = self.__session.put(url, json=body)
if response.status_code == 200:
deployment_id = response.json()['deployment_id']
data = {
"deployment_id": deployment_id,
"status": "inactive",
"location": location,
"endpoint": f"{hostname}/v1/predict"
}
return data
else:
return {
"message": f"Failed with status code {response.status_code}"}
[docs] def activate_deployment(self, deployment_id):
""" Activates a model deployment.
Args:
deployment_id (str): The deployment id
"""
url = (
f'{self.hostname}/v1/{self.__ext}/deployments/{deployment_id}/activate'
)
response = self.__session.patch(url)
if response.status_code == 200:
return response.json()
else:
return {
"message": f"Failed with status code {response.status_code}"}
[docs] def deactivate_deployment(self, deployment_id):
""" Deactivates a model deployment.
Args:
deployment_id (str): The deployment id
"""
url = (
f'{self.hostname}/v1/{self.__ext}/deployments/{deployment_id}/deactivate'
)
response = self.__session.patch(url)
if response.status_code == 200:
return response.json()
else:
return {
"message": f"Failed with status code {response.status_code}"}
[docs] def generate_deploy_key(
self, description: str, deployment_id: str,
days_until_expiry: float = 90, clipboard: bool = True,
surpress_output: bool = False
) -> None:
""" Generates a deploy key for a model deployment.
Args:
description (str): Description of the deploy key use case.
deployment_id (str): The deployment id.
days_until_expiry (float): The number of days until the key expires.
surpress_output (bool): Surpress output. Defaults to False.
Returns:
None: No key is returned. The key is copied to the clipboard.
"""
url = f'{self.hostname}/v1/{self.__ext}/deployments/{deployment_id}/create-deploy-key'
params = {
'description': description,
'days_until_expiry': days_until_expiry
}
response = self.__session.put(
url=url,
json=params
)
deploy_key = response.json()
if deploy_key:
if not clipboard:
return deploy_key
pyperclip.copy(deploy_key['deploy_key'])
if not surpress_output:
print("Deploy key copied to clipboard!")
else:
raise ConnectionError(
f"Falied to generate deploy key. Code: {response.status_code}")
[docs] def generate_example_deployment_payload(self, deployment_id):
""" Generates an example deployment payload for a deployment.
Args:
deployment_id (str): The deployment id.
"""
url = f'{self.hostname}/v1/{self.__ext}/deployments/{deployment_id}/payload'
response = self.__session.get(url)
return response.json()
@staticmethod
def __parse_function(func):
""" Parses a function to a middleware function. """
if not callable(func):
raise Exception("Function must be callable")
sig = inspect.signature(func)
params = list(sig.parameters.values())
if len(params) != 1:
raise Exception("Function must take one parameter")
# Parse the source code to an AST
source = inspect.getsource(func)
parsed_ast = ast.parse(source)
# Rename the function in the AST
for node in ast.walk(parsed_ast):
if isinstance(node, ast.FunctionDef) and node.name == func.__name__:
node.name = "middleware"
break
# Store the modified source
modified_source = ast.unparse(parsed_ast)
# Compile the AST back to code and execute in a new namespace
local_vars = {}
exec(compile(
parsed_ast, filename="<ast>", mode="exec"),
func.__globals__, local_vars)
middleware = local_vars['middleware']
middleware.source = modified_source
return middleware
[docs] def add_deployment_middleware(
self, deployment_id, func, name, description=None):
""" Adds or replaces a middleware function to a deployment.
Args:
deployment_id (str): The deployment id
func (function): The middleware function
"""
url = (
f'{self.hostname}/v1/{self.__ext}/deployments/{deployment_id}/add-middleware'
)
# Convert function to expected name
if func.__name__ != 'middleware':
func = self.__parse_function(func)
source_code = func.source
else:
source_code = inspect.getsource(func)
body = {
"code_block": source_code,
"name": name,
"description": description
}
response = self.__session.put(
url=url,
json=body
)
return response.json()
[docs] def delete_deployment_middleware(self, deployment_id):
""" Deletes a middleware function from a deployment.
Args:
deployment_id (str): The deployment id
"""
url = (
f'{self.hostname}/v1/{self.__ext}/deployments/{deployment_id}/middleware'
)
response = self.__session.delete(url)
return {"status_code": response.status_code}
def _gpt_report(
self, model_id, version_id, target_description='',
project_objective='', max_features=15, temperature=0.5):
url = (
f'{self.hostname}/v1/{self.__ext}/models/{model_id}/versions/'
f'{version_id}/generate-report'
)
params = {
'target_description': target_description,
'project_objective': project_objective,
'max_features': max_features,
'temperature': temperature
}
response = self.__session.put(
url=url,
json=params,
)
content = get_response_content(response)
return content