""" Copyright Xplainable Pty Ltd, 2023"""
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
import numpy as np
import pandas as pd
from ._base_model import BaseModel, BasePartition
from ._constructor import XCatConstructor, XNumConstructor, ConstructorParams
from sklearn.metrics import *
from time import time
from typing import Union
[docs]class XClassifier(BaseModel):
""" Xplainable Classification model for transparent machine learning.
XClassifier offers powerful predictive power and complete transparency
for classification problems on tabular data. It is designed to be used
in place of black box models such as Random Forests and Gradient
Boosting Machines when explainabilty is important.
XClassifier is a feature-wise ensemble of decision trees. Each tree is
constructed using a custom algorithm that optimises for information with
respect to the target variable. The trees are then weighted and
normalised against one another to produce a variable step function
for each feature. The summation of these functions produces a score that can
be explained in real time. The score is a float value between 0 and 1
and represents the likelihood of the positive class occuring. The score
can also be mapped to a probability when probability is important.
When the fit method is called, the specified params are set across all
features. Following the initial fit, the update_feature_params method
may be called on a subset of features to update the params for those
features only. This allows for a more granular approach to model tuning.
Example:
>>> from xplainable.core.models import XClassifier
>>> import pandas as pd
>>> from sklearn.model_selection import train_test_split
>>> data = pd.read_csv('data.csv')
>>> x = data.drop(columns=['target'])
>>> y = data['target']
>>> x_train, x_test, y_train, y_test = train_test_split(
>>> x, y, test_size=0.2, random_state=42)
>>> model = XClassifier()
>>> model.fit(x_train, y_train)
>>> model.predict(x_test)
Args:
max_depth (int, optional): The maximum depth of each decision tree.
min_info_gain (float, optional): The minimum information gain required to make a split.
min_leaf_size (float, optional): The minimum number of samples required to make a split.
alpha (float, optional): Sets the number of possible splits with respect to unique values.
weight (float, optional): Activation function weight.
power_degree (float, optional): Activation function power degree.
sigmoid_exponent (float, optional): Activation function sigmoid exponent.
map_calibration (bool, optional): Maps the associated probability for each possible feature score.
"""
def __init__(
self,
max_depth=8,
min_info_gain=0.0001,
min_leaf_size=0.0001,
ignore_nan=False,
weight=1,
power_degree=1,
sigmoid_exponent=0,
tail_sensitivity: float = 1.0,
map_calibration: bool = True
):
super().__init__(
ConstructorParams(
max_depth,
min_info_gain,
min_leaf_size,
ignore_nan,
weight,
power_degree,
sigmoid_exponent,
tail_sensitivity
)
)
self._calibration_map = {}
self._support_map = {}
self.map_calibration = map_calibration
def _map_calibration(self, y, y_prob, smooth=15):
""" Maps the associated probability for each possible feature score.
Args:
x (pandas.DataFrame): The x variables used for training.
y (pandas.Series): The target series.
"""
# Make prediction and set to x
x = pd.DataFrame(
{
'y_prob': y_prob,
'target': y.copy().to_numpy()
})
# Record prediction bins
x['bin'] = pd.cut(x['y_prob'], [i / 100 for i in range(0, 101, 5)])
# Get target info grouped by bins
df = x.groupby('bin').agg({'target': ['mean', 'count']})
# Fix column formatting
df.columns = df.columns.map('_'.join)
df = df.rename(columns={'target_mean': 'tm', 'target_count': 'tc'})
# Record last and next scores for normalisation
df['lc'] = df['tc'].shift(1)
df['nc'] = df['tc'].shift(-1)
df['lm'] = df['tm'].shift(1)
df['nm'] = df['tm'].shift(-1)
# np.nan is the same as 0
df.fillna(0, inplace=True)
# Record rolling total for normalisation calc
df['rt'] = df['tc'].rolling(3, center=True, min_periods=2).sum()
# Scale counts to rolling total
df['tc_pct'] = df['tc'] / df['rt']
df['lc_pct'] = df['lc'] / df['rt']
df['nc_pct'] = df['nc'] / df['rt']
# Calculate weighted probability
df['wp'] = (df['lc_pct'] * df['lm']) + (df['tc_pct'] * df['tm']) + \
(df['nc_pct'] * df['nm'])
# Forward fill zero values
df['wp'] = df['wp'].replace(
to_replace=0, method='ffill')
# Get weighted probability and arrange
wp = df['wp']
wp = pd.DataFrame(np.repeat(wp.to_numpy(), 5, axis=0))
wp = pd.concat([wp, wp.iloc[99]], ignore_index=True)
# Forward fill nan values
wp = wp.fillna(method='ffill')
# Fill missing values that could not be
# forward filled
wp = wp.fillna(0)
# Calculate support at each bin
s = df['tc']
s = pd.DataFrame(np.repeat(s.to_numpy(), 5, axis=0))
s = pd.concat([s, s.iloc[99]], ignore_index=True)
s = s.fillna(method='ffill')
s = s.fillna(0)
self._support_map.update(dict(s[0]))
wp = wp.rolling(smooth, center=True, min_periods=3).mean()[0]
wp.fillna(method='ffill')
wp.fillna(0)
# Store results dict to class variable
return dict(wp)
[docs] def fit(
self, x: Union[pd.DataFrame, np.ndarray],
y: Union[pd.Series, np.array], id_columns: list = [],
column_names: list = None, target_name: str = 'target', alpha=0.1
) -> 'XClassifier':
""" Fits the model to the data.
Args:
x (pd.DataFrame | np.ndarray): The x variables used for training.
y (pd.Series | np.array): The target values.
id_columns (list, optional): id_columns to ignore from training.
column_names (list, optional): column_names to use for training if using a np.ndarray
target_name (str, optional): The name of the target column if using a np.array
alpha (float): Controlls the number of possible splits with respect to unique values.
Returns:
XClassifier: The fitted model.
"""
start = time()
x, y, x_cal, y_cal = super()._fit_check(
x,
y,
id_columns,
column_names,
target_name,
map_calibration=self.map_calibration
)
for i in range(x.shape[1]):
f = x[:, i]
# chooses constructor type based on type of input feature
constructor = XCatConstructor if self.columns[i] in self.categorical_columns else XNumConstructor
xconst = constructor(False, self.default_parameters.__copy__())
xconst.fit(f, y, alpha)
xconst.construct()
self._constructs.append(xconst)
self._build_profile()
# Calibration map
if self.map_calibration:
if len(self.target_map) > 0:
y_cal = y_cal.map(self.target_map)
y_prob = self.predict_score(x_cal)
self._calibration_map = self._map_calibration(y_cal, y_prob, 15)
# record metadata
self.metadata['fit_time'] = time() - start
self.metadata['observations'] = x.shape[0]
self.metadata['features'] = x.shape[1]
return self
[docs] def update_feature_params(
self,
features: list,
max_depth=None,
min_info_gain=None,
min_leaf_size=None,
ignore_nan=None,
weight=None,
power_degree=None,
sigmoid_exponent=None,
tail_sensitivity=None,
x: Union[pd.DataFrame, np.ndarray] = None,
y: Union[pd.Series, np.array] = None, *args, **kwargs
) -> 'XClassifier':
""" Updates the parameters for a subset of features.
XClassifier allows you to update the parameters for a subset of features
for a more granular approach to model tuning. This is useful when you
identify under or overfitting on some features, but not all.
This also referred to as 'refitting' the model to a new set of params.
Refitting parameters to an xplainable model is extremely fast as it has
already pre-computed the complex metadata required for training.
This can yeild huge performance gains compared to refitting
traditional models, and is particularly powerful when parameter tuning.
The desired result is to have a model that is well calibrated across all
features without spending considerable time on parameter tuning.
Args:
features (list): The features to update.
max_depth (int): The maximum depth of each decision tree in the subset.
min_info_gain (float): The minimum information gain required to make a split in the subset.
min_leaf_size (float): The minimum number of samples required to make a split in the subset.
ignore_nan (bool): Whether to ignore nan/null/empty values
weight (float): Activation function weight.
power_degree (float): Activation function power degree.
sigmoid_exponent (float): Activation function sigmoid exponent.
tail_sensitivity (float): Adds weight to divisive leaf nodes in the subset.
x (pd.DataFrame | np.ndarray, optional): The x variables used for training. Use if map_calibration is True.
y (pd.Series | np.array, optional): The target values. Use if map_calibration is True.
Returns:
XClassifier: The refitted model.
"""
super().update_feature_params(
features,
max_depth=max_depth,
min_info_gain=min_info_gain,
min_leaf_size=min_leaf_size,
ignore_nan=ignore_nan,
weight=weight,
power_degree=power_degree,
sigmoid_exponent=sigmoid_exponent,
tail_sensitivity=tail_sensitivity,
*args, **kwargs
)
if self.map_calibration and x is not None and y is not None:
y_prob = self.predict_score(x)
self._calibration_map = self._map_calibration(y, y_prob, 15)
return self
[docs] def predict_score(self, x: Union[pd.DataFrame, np.ndarray]) -> np.array:
""" Predicts the score for each row in the data.
Args:
x (pd.DataFrame | np.ndarray): The x variables to predict.
Returns:
np.array: The predicted scores
"""
trans = self._transform(x)
scores = np.sum(trans, axis=1) + self.base_value
return scores
[docs] def predict_proba(self, x: Union[pd.DataFrame, np.ndarray]) -> np.array:
""" Predicts the probability for each row in the data.
Args:
x (pd.DataFrame | np.ndarray): The x variables to predict.
Returns:
np.array: The predicted probabilities
"""
scores = self.predict_score(x) * 100
scores = scores.astype(int)
scores = np.vectorize(self._calibration_map.get)(scores)
return scores
[docs] def predict(
self, x: Union[pd.DataFrame, np.ndarray],
use_prob: bool=False, threshold: float = 0.5,
remap: bool = True
) -> np.array:
""" Predicts the target for each row in the data.
Args:
x (pd.DataFrame | np.ndarray): The x variables to predict.
use_prob (bool, optional): Use probability instead of score.
threshold (float, optional): The threshold to use for classification.
remap (bool, optional): Remap the target values to their original values.
Returns:
np.array: The predicted targets
"""
scores = self.predict_proba(x) if use_prob else self.predict_score(x)
pred = (scores > threshold).astype(int)
if len(self.target_map) > 0 and remap:
pred = np.vectorize(self.target_map.reverse.get)(pred)
return pred
[docs] def predict_explain(self, x):
""" Predictions with explanations.
Args:
x (array-like): data to predict
Returns:
pd.DataFrame: prediction and explanation
"""
t = super().predict_explain(x)
t['proba'] = (t['score'] * 100).astype(int).map(self._calibration_map)
t['support'] = (t['score'] * 100).astype(int).map(self._support_map)
return t
[docs] def evaluate(
self, x: Union[pd.DataFrame, np.ndarray],
y: Union[pd.Series, np.array], use_prob: bool = False,
threshold: float = 0.5
):
""" Evaluates the model performance.
Args:
x (pd.DataFrame | np.ndarray): The x variables to predict.
y (pd.Series | np.array): The target values.
use_prob (bool, optional): Use probability instead of score.
threshold (float, optional): The threshold to use for classification.
Returns:
dict: The model performance metrics.
"""
# Make predictions
y_prob = self.predict_proba(x) if use_prob else self.predict_score(x)
y_prob = np.clip(y_prob, 0, 1) # because of rounding errors
y_pred = (y_prob > threshold).astype(int)
if (len(self.target_map) > 0) and (y.dtype == 'object'):
y = y.copy().map(self.target_map)
# Calculate metrics
cm = confusion_matrix(y, y_pred).tolist()
cr = classification_report(y, y_pred, output_dict=True, zero_division=0)
try:
roc_auc = roc_auc_score(y, y_prob)
except Exception:
roc_auc = np.nan
try:
brier_loss = 1 - brier_score_loss(y, y_prob)
except Exception:
brier_loss = np.nan
try:
cohen_kappa = cohen_kappa_score(y, y_pred)
except Exception:
cohen_kappa = np.nan
try:
log_loss_score = log_loss(y, y_prob)
except Exception:
log_loss_score = np.nan
# Produce output
evaluation = {
'confusion_matrix': cm,
'classification_report': cr,
'roc_auc': roc_auc,
'neg_brier_loss': brier_loss,
'log_loss': log_loss_score,
'cohen_kappa': cohen_kappa
}
return evaluation
[docs]class PartitionedClassifier(BasePartition):
""" Partitioned XClassifier model.
This class is a wrapper for the XClassifier model that allows for
individual models to be trained on subsets of the data. Each model
can be used in isolation or in combination with the other models.
Individual models can be accessed using the partitions attribute.
Example:
>>> from xplainable.core.models import PartitionedClassifier
>>> import pandas as pd
>>> from sklearn.model_selection import train_test_split
>>> data = pd.read_csv('data.csv')
>>> train, test = train_test_split(data, test_size=0.2)
>>> # Train your model (this will open an embedded gui)
>>> partitioned_model = PartitionedClassifier(partition_on='partition_column')
>>> # Iterate over the unique values in the partition column
>>> for partition in train['partition_column'].unique():
>>> # Get the data for the partition
>>> part = train[train['partition_column'] == partition]
>>> x_train, y_train = part.drop('target', axis=1), part['target']
>>> # Fit the embedded model
>>> model = XClassifier()
>>> model.fit(x_train, y_train)
>>> # Add the model to the partitioned model
>>> partitioned_model.add_partition(model, partition)
>>> # Prepare the test data
>>> x_test, y_test = test.drop('target', axis=1), test['target']
>>> # Predict on the partitioned model
>>> y_pred = partitioned_model.predict(x_test)
Args:
partition_on (str, optional): The column to partition on.
"""
def __init__(self, partition_on: str=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.partition_on = partition_on
[docs] def predict_score(
self, x: Union[pd.DataFrame, np.ndarray], proba: bool = False):
""" Predicts the score for each row in the data across all partitions.
The partition_on columns will be used to determine which model to use
for each observation. If the partition_on column is not present in the
data, the '__dataset__' model will be used.
Args:
x (pd.DataFrame | np.ndarray): The x variables to predict.
Returns:
np.array: The predicted scores
"""
x = pd.DataFrame(x).copy().reset_index(drop=True)
if self.partition_on is None:
model = self.partitions['__dataset__']
return model.predict_score(x)
else:
partitions = self.partitions.keys()
frames = []
unq = list(x[self.partition_on].unique())
# replace unknown partition values with __dataset__ for general model
for u in unq:
if u not in partitions:
x[self.partition_on] = x[self.partition_on].replace(u, '__dataset__')
unq.remove(u)
if "__dataset__" not in unq:
unq.append("__dataset__")
partition_map = []
for partition in unq:
part = x[x[self.partition_on] == partition]
idx = part.index
# Use partition model first
part_trans = self.partitions[partition]._transform(part)
_base_value = self.partitions[partition].base_value
scores = pd.Series(part_trans.sum(axis=1) + _base_value)
scores.index = idx
frames.append(scores)
if proba:
[partition_map.append((i, partition)) for i in idx]
all_scores = np.array(pd.concat(frames).sort_index())
if proba:
partition_map = np.array(partition_map)
partition_map = partition_map[partition_map[:, 0].argsort()][:,1]
return all_scores, partition_map
return all_scores
[docs] def predict_proba(self, x):
""" Predicts the probability for each row in the data across all partitions.
The partition_on columns will be used to determine which model to use
for each observation. If the partition_on column is not present in the
data, the '__dataset__' model will be used.
Args:
x (pd.DataFrame | np.ndarray): The x variables to predict.
Returns:
np.array: The predicted probabilities
"""
if self.partition_on is None:
model = self.partitions['__dataset__']
return model.predict_proba(x)
scores, partition_map = self.predict_score(x, True)
scores = (scores * 100).astype(int)
def get_proba(p, score):
mapp = self.partitions[str(p)]._calibration_map
return mapp.get(score)
scores = np.vectorize(get_proba)(partition_map, scores)
return scores
[docs] def predict(self, x, use_prob = False, threshold = 0.5):
""" Predicts the target for each row in the data across all partitions.
The partition_on columns will be used to determine which model to use
for each observation. If the partition_on column is not present in the
data, the '__dataset__' model will be used.
Args:
x (pd.DataFrame | np.ndarray): The x variables to predict.
Returns:
np.array: The predicted targets
"""
# Get the score for each observation
y_pred = self.predict_proba(x) if use_prob else self.predict_score(x)
# Return 1 if feature value > threshold else 0
pred = pd.Series(y_pred).map(lambda x: 1 if x >= threshold else 0)
map_inv = self.partitions['__dataset__'].target_map.reverse
if map_inv:
return np.array(pred.map(map_inv))
else:
return np.array(pred)