""" Copyright Xplainable Pty Ltd, 2023"""
import numpy as np
import random
import math
from numba import njit, prange
from ...utils.numba_funcs import *
from .genetic import XEvolutionaryNetwork
import warnings
from tqdm.auto import tqdm
[docs]class BaseLayer:
""" Base class for optimisation layers.
Args:
metric (str, optional): Metric to optimise on. Defaults to 'mae'.
"""
def __init__(self, metric='mae'):
self.xnetwork = None
self.metric = metric
def _calculate_error(self, x, y):
""" Calculates the error of a set of predictions.
Args:
x (numpy.array): An array of transformed values.
y (numpy.array): An array of the true values.
Returns:
np.array: An array of the errors.
"""
# calculate relative error
_pred = nansum_numba(x, axis=1) + self.xnetwork.model.base_value
if self.xnetwork.static_scores is not None:
_pred = _pred + self.xnetwork.static_scores
if self.xnetwork.apply_range:
_pred = _pred.clip(*self.xnetwork.model.prediction_range)
return _pred - y
def _increment_error(self, x, y):
pass
@staticmethod
def _score():
pass
@staticmethod
@njit
def _mae(err):
mae = np.mean(np.abs(err))
return mae
@staticmethod
@njit
def _mse(err):
mse = np.mean(err**2)
return mse
def _initialise(self, metric):
# Infer objective from metric
if metric == 'mse':
self._score = self._mse
self.objective = 'minimise'
elif metric == 'mae':
self._score = self._mae
self.objective = 'minimise'
else:
raise ValueError(f'Metric {metric} not supported')
[docs]class Evolve(BaseLayer):
""" Evolutionary algorithm to optimise XRegressor leaf weights.
The Evolve layer uses a genetic algorithm to optimise the leaf weights
of an XRegressor model. The algorithm works by mutating the leaf weights
of the model and scoring the resulting predictions. The best mutations
are then selected to reproduce and mutate again. This process is repeated
until the maximum number of generations is reached, or the early stopping
threshold is reached.
Args:
mutations (int, optional): The number of mutations to generate per generation.
generations (int, optional): The number of generations to run.
max_generation_depth (int, optional): The maximum depth of a generation.
max_severity (float, optional): The maximum severity of a mutation.
max_leaves (int, optional): The maximum number of leaves to mutate.
early_stopping (int, optional): Stop early if no improvement after n iters.
"""
def __init__(
self, mutations: int = 100, generations: int = 50,
max_generation_depth: int = 10, max_severity: float = 0.5,
max_leaves: int = 20, early_stopping: int = None):
super().__init__()
self.mutations = mutations
self.generations = generations
self.max_generation_depth = max_generation_depth
self.max_severity = max_severity
self.max_leaves = max_leaves
self.early_stopping = early_stopping
self.objective = None
self.xnetwork = None
self.y = None
self._error = None
self.generation_id = 1
def _get_params(self) -> dict:
""" Returns the parameters of the layer.
Returns:
dict: The layer parameters.
"""
params = {
'mutations': self.mutations,
'generations': self.generations,
'max_generation_depth': self.max_generation_depth,
'max_severity': self.max_severity,
'max_leaves':self.max_leaves,
'metric': self.metric
}
return params
@property
def params(self) -> dict:
""" Returns the parameters of the layer.
Returns:
dict: The layer parameters.
"""
return self._get_params()
def _mutate(self, chromosome: np.array) -> np.array:
""" Randomly mutates a chromosome.
Args:
chromosome (np.array): The chromosome to mutate.
Returns:
np.array: The mutated chromosome.
"""
new_chromosome = np.array(chromosome)#.copy()
chrome_length = chromosome.shape[0]
# randomly select number of leaves to mutate
num_leaves = random.sample(
[i for i in range(1, int(min([self.max_leaves, chrome_length])+1))],k=1)[0]
# identify features to mutate
rand_indices = random.choices([i for i in range(int(chrome_length))], k=num_leaves)
# Initiate delta mapping (store changes only)
delta = {}
for i in rand_indices:
severity = random.randrange(1, int(self.max_severity * 1000 + 1)) / 1000
mutation = random.choice([1 + severity, 1 - severity])
delta[i] = mutation
new_chromosome[i] *= mutation
return new_chromosome, delta
def _n_mutations(self, chromosome: np.array, n: int) -> tuple:
""" Generates n mutations of a chromosome.
Args:
chromosome (np.array): The chromosome to mutate.
n (int): The number of mutations to generate.
Returns:
tuple: The mutated chromosomes and deltas.
"""
mutations = []
deltas = []
for _ in range(n):
new_chromosome, delta = self._mutate(chromosome)
mutations.append(new_chromosome)
deltas.append(delta)
return np.array(mutations), np.array(deltas)
@staticmethod
@njit(parallel=True, fastmath=True)
def _mutate_transform(x, delta_keys, delta_values):
for k in prange(len(delta_keys)):
i = int(delta_keys[k])
v = delta_values[k]
mask = ~np.isnan(x[:, i])
for j in prange(x.shape[0]):
if mask[j]:
x[j, i] = x[j, i] * v
return x
@staticmethod
@njit(parallel=True, fastmath=True)
def _inverse_mutate_transform(x, delta_keys, delta_values):
""" Inverse mutation is faster than copying and altering the full array """
for k in prange(len(delta_keys)):
i = int(delta_keys[k])
v = delta_values[k]
mask = ~np.isnan(x[:, i])
for j in prange(x.shape[0]):
if mask[j]:
x[j, i] = x[j, i] / v
return x
def _score_mutation(self, x: np.ndarray, delta: dict) -> float:
""" Scores a mutation of a chromosome.
Args:
x (np.ndarray): The input variables used for prediction.
delta (dict): The mutation to apply.
"""
# get the indices and delta changes for each mutation
dkeys = np.array(list(delta.keys()))
dvalues = np.array(list(delta.values()))
# Apply mutation
x = self._mutate_transform(x, dkeys, dvalues)
# Score mutation
err = self._calculate_error(x, self.y)
score = self._score(err)
# reverse mutation to maintain initial structure
x = self._inverse_mutate_transform(x, dkeys, dvalues)
return score
def _reproduce(self, pair: tuple) -> dict:
""" Merges the genes of a chromosome pair
Args:
pair (tuple): The pair of chromosomes to merge.
"""
# get parent chromosomes
a, b = pair
child = (a + b) / 2
return child
def _get_delta(self, chromosome: np.array) -> dict:
""" Gets the delta values for a chromosome.
Args:
chromosome (np.array): The chromosome to get deltas for.
Returns:
dict: The delta values.
"""
return {
i: v / self.target_chromosome[i] for i, v in enumerate(
chromosome) if v != self.target_chromosome[i]
}
def _natural_selection(
self, x: np.ndarray, mutations: np.array, deltas: dict
) -> np.array:
""" Selects the best (superior) mutations from a pool.
Args:
x (np.ndarray): The input variables used for prediction.
mutations (np.array): The mutations to select from.
deltas (dict): The delta values for each mutation.
Returns:
np.array: The selected superior mutations.
"""
# Score mutations
scores = np.array([])
for delta in deltas:
scores = np.append(scores, self._score_mutation(x, delta))
# Filter for superior mutations
if self.objective == 'minimise':
_filter = np.array(
[True if i < self.target_score else False for i in scores])
else:
_filter = np.array(
[True if i > self.target_score else False for i in scores])
# Select superior mutations
superiors = mutations[_filter]
if superiors.shape[0] == 0:
return np.array([[]])
top_scores = scores[_filter]
self.target_score = np.mean(top_scores)
# when there are 2 equal scores, sorting is skipped
if superiors.shape[0] > 1:
try:
superiors = np.array([x for _, x in sorted(zip(top_scores, superiors))])
except Exception as e:
pass
return superiors
def _reproduce_and_mutate(self, superiors: np.array) -> tuple:
""" Reproduces and mutates a pool of superior chromosomes.
Args:
superiors (np.array): The superior chromosomes to reproduce.
"""
# Pair mates together
mates = [[superiors[i], superiors[i+1]] for i in range(0, superiors.shape[0] - 1, 2)]
# Generate child chromosomes
offspring = [self._reproduce(mate) for mate in mates]
# Add most superior parent to pool
offspring.append(superiors[0])
for child in offspring.copy():
offspring.append(self._mutate(child)[0])
# Extract deltas from offspring
deltas = [self._get_delta(i) for i in offspring]
return np.array(offspring), np.array(deltas)
def _run_generation(self, x: np.ndarray) -> np.array:
""" Runs a single optimisation generation.
Args:
x (np.ndarray): The input variables used for prediction.
"""
# Generate first n mutations
m = self._n_mutations(self.target_chromosome, self.mutations)
# select parent chromosomes
parents = self._natural_selection(x, *m)
for _ in range(self.max_generation_depth):
if parents.shape[0] == 1:
break
m = self._reproduce_and_mutate(parents)
parents = self._natural_selection(x, *m)
return parents[0]
def _re_map(self, x: np.ndarray) -> np.ndarray:
""" Re-maps the optimised chromosome to the model profile.
Args:
x (np.ndarray): The input variables used for prediction.
Returns:
np.ndarray: A new optimised model profile.
"""
chromosome = np.nanmax(x, axis=0)
for l, v in zip(self.xnetwork.leaves, chromosome):
f, _id = l.split("_")
self.xnetwork.model._profile[int(f)][int(_id)][-4] = v
self.xnetwork.model._constructs[int(f)]._nodes[int(_id)][-4] = v
return self.xnetwork.model._profile
[docs]class Tighten(BaseLayer):
""" A leaf boosting algorithm to optimise XRegressor leaf node weights.
The Tighten layer uses a novel leaf boosting algorithm to optimise the
leaf weights of an XRegressor model. The algorithm works by iteratively
identifying the leaf node that will have the greatest impact on the
overall model score, and then incrementally increasing or decreasing
the leaf node weight to improve the model score. This process is repeated
until the maximum number of iterations is reached, or the early stopping
threshold is reached.
Args:
iterations (int): The number of iterations to run.
learning_rate (float): How fast the model learns. Between 0.001 - 1
early_stopping (int): Stop early if no improvement after n iters.
"""
def __init__(
self, iterations: int = 100, learning_rate: float = 0.03,
early_stopping: int = None):
super().__init__()
# store params
self.iterations = iterations
self.learning_rate = learning_rate
self.early_stopping = early_stopping
self.xnetwork = None
def _get_params(self) -> dict:
""" Returns the parameters of the layer.
Returns:
dict: The layer parameters.
"""
params = {
'iterations': self.iterations,
'learning_rate': self.learning_rate,
'early_stopping': self.early_stopping,
'metric': self.metric
}
return params
@property
def params(self) -> dict:
""" Returns the parameters of the layer.
Returns:
dict: The layer parameters.
"""
return self._get_params()
def _next_best_change(self, errors: np.array) -> tuple:
""" Identifies the most effective leaf node change to improve model.
Args:
errors (np.array): An array of errors for the current iteration.
Returns:
int: The index of the best leaf node.
float: The max amount the score can change.
"""
# load mask
mask = self.xnetwork._mask
# Build error map
errmp = (np.transpose([errors] * mask.shape[1])) * mask
# Set non-masked area to nan if zero
errmp[(~mask) & (errmp == 0)] = np.nan
# count even values
ec = (errmp == 0).sum(axis=0)
# Intantiate computation masks
o_mask = (errmp > 0)
u_mask = (errmp < 0)
# Identify leaves with over-indexed errors
o_vals = (errmp * o_mask)
o_vals[o_vals == 0] = np.nan
# Identify leaves with under-indexed errors
u_vals = (errmp * u_mask)
u_vals[u_vals == 0] = np.nan
if self.metric == 'mse':
o_vals = o_vals**2
u_vals = u_vals**2
# surpress know warning that raises when all values are nan
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
# get mean error and count of obs that are too high for each leaf
om = np.nanmean(o_vals, axis=0)
oc = o_mask.sum(axis=0)
# get mean error and count of obs that are too low for each leaf
um = np.nanmean(abs(u_vals), axis=0)
uc = u_mask.sum(axis=0)
# calculate max benefit of inc/dec each leaf
inc = (um * uc) - (um * oc) - (um * ec)
dec = (om * oc) - (om * uc) - (om * ec)
# get the best outcomes
bsts = np.nanmax([inc, dec])
# find individual best outcome and loc
bst = np.nanmax(bsts)
bstloc = np.where(bsts==bst)[0][0]
# get value to update by
if inc[bstloc] > dec[bstloc]:
change = math.sqrt(um[bstloc]) * self.learning_rate
else:
change = math.sqrt(om[bstloc]) * -1 * self.learning_rate
#change = get_change(bstloc) * self.learning_rate
return bstloc, change
def _run_iteration(self, x: np.ndarray, y: np.array):
""" Runs a single optimisation iteration.
Args:
x (np.ndarray): An array of transformed values.
y (np.array): An array of the true values.
Returns:
numpy.array: An array of transformed values after opt iteration.
"""
# calculate error
err = self._calculate_error(x, y)
# determine next best change
bstloc, change = self._next_best_change(err)
# apply best change
x[:, bstloc][~np.isnan(x[:, bstloc])] += change
# Update profile
f, _id = self.xnetwork.leaves[bstloc].split("_")
self.xnetwork.model._profile[int(f)][int(_id)][-4] += change
return x