"""A set of functions to evaluate predictions on common performance
and fairness metrics, possibly at a specified FPR or FNR target.
Based on: https://github.com/AndreFCruz/hpt/blob/main/src/hpt/evaluation.py
"""
from __future__ import annotations
import logging
import statistics
from typing import Optional
from itertools import product
import numpy as np
from sklearn.metrics import confusion_matrix, log_loss, mean_squared_error, accuracy_score
from .roc_utils import compute_roc_point_from_predictions
from .binarize import compute_binary_predictions
from ._commons import join_dictionaries
def _is_valid_number(num) -> bool:
return isinstance(num, (float, int, np.number)) and not np.isnan(num)
def _safe_division(a: float, b: float, *, worst_result: float):
"""Tries to divide the given arguments and returns `worst_result` if unsuccessful."""
if b == 0 or not _is_valid_number(a) or not _is_valid_number(b):
logging.debug(f"Error in the following division: {a} / {b}")
return worst_result
else:
return a / b
[docs]
def eval_accuracy_and_equalized_odds(
y_true: np.ndarray,
y_pred_binary: np.ndarray,
sensitive_attr: np.ndarray,
l_p_norm: int = np.inf,
display: bool = False,
) -> tuple[float, float]:
"""Evaluate accuracy and equalized odds of the given predictions.
Parameters
----------
y_true : np.ndarray
The true class labels.
y_pred_binary : np.ndarray
The predicted class labels.
sensitive_attr : np.ndarray
The sensitive attribute data.
l_p_norm : int, optional
The norm to use for the constraint violation, by default np.inf.
display : bool, optional
Whether to print results or not, by default False.
Returns
-------
tuple[float, float]
A tuple of (fairness, equalized odds violation).
"""
n_groups = len(np.unique(sensitive_attr))
roc_points = [
compute_roc_point_from_predictions(
y_true[sensitive_attr == i],
y_pred_binary[sensitive_attr == i])
for i in range(n_groups)
]
roc_points = np.vstack(roc_points)
linf_constraint_violation = [
np.linalg.norm(roc_points[i] - roc_points[j], ord=l_p_norm)
for i, j in product(range(n_groups), range(n_groups))
if i < j
]
acc_val = accuracy_score(y_true, y_pred_binary)
eq_odds_violation = max(linf_constraint_violation)
if display:
print(f"\tAccuracy: {acc_val:.2%}")
print(f"\tUnfairness: {eq_odds_violation:.2%}")
return (acc_val, eq_odds_violation)
[docs]
def evaluate_fairness(
y_true: np.ndarray,
y_pred: np.ndarray,
sensitive_attribute: np.ndarray,
return_groupwise_metrics: Optional[bool] = False,
) -> dict:
"""Evaluates fairness as the ratios between group-wise performance metrics.
Parameters
----------
y_true : np.ndarray
The true class labels.
y_pred : np.ndarray
The discretized predictions.
sensitive_attribute : np.ndarray
The sensitive attribute (protected group membership).
return_groupwise_metrics : Optional[bool], optional
Whether to return group-wise performance metrics (bool: True) or only
the ratios between these metrics (bool: False), by default False.
Returns
-------
dict
A dictionary with key-value pairs of (metric name, metric value).
"""
# All unique values for the sensitive attribute
unique_groups = np.unique(sensitive_attribute)
results = {}
groupwise_metrics = {}
unique_metrics = set()
# Helper to compute key/name of a group-wise metric
def group_metric_name(metric_name, group_name):
return f"{metric_name}_group={group_name}"
assert (
len(unique_groups) > 1
), f"Found a single unique sensitive attribute: {unique_groups}"
for s_value in unique_groups:
# Indices of samples that belong to the current group
group_indices = np.argwhere(sensitive_attribute == s_value).flatten()
# Filter labels and predictions for samples of the current group
group_labels = y_true[group_indices]
group_preds = y_pred[group_indices]
# Evaluate group-wise performance
curr_group_metrics = evaluate_performance(group_labels, group_preds)
# Add group-wise metrics to the dictionary
groupwise_metrics.update(
{
group_metric_name(metric_name, s_value): metric_value
for metric_name, metric_value in curr_group_metrics.items()
}
)
unique_metrics = unique_metrics.union(curr_group_metrics.keys())
# Compute ratios and absolute diffs
for metric_name in unique_metrics:
curr_metric_results = [
groupwise_metrics[group_metric_name(metric_name, group_name)]
for group_name in unique_groups
]
# Metrics' ratio
ratio_name = f"{metric_name}_ratio"
# NOTE: should this ratio be computed w.r.t. global performance?
# - i.e., min(curr_metric_results) / global_curr_metric_result;
# - same question for the absolute diff calculations;
results[ratio_name] = _safe_division(
min(curr_metric_results), max(curr_metric_results),
worst_result=0,
)
# Metrics' absolute difference
diff_name = f"{metric_name}_diff"
results[diff_name] = max(curr_metric_results) - min(curr_metric_results)
# ** Equalized odds **
# default value: use maximum constraint violation for TPR and FPR equality
results["equalized_odds_ratio"] = min(
results["fnr_ratio"],
results["fpr_ratio"],
)
results["equalized_odds_diff"] = max(
results["tpr_diff"], # same as FNR diff
results["fpr_diff"], # same as TNR diff
)
# Evaluate equalized odds using other l-p norms
# (default value corresponds to l-infinity norm)
available_norms = [1, 2, np.inf]
for norm in available_norms:
metric_name = f"equalized_odds_diff_l{norm}"
results[metric_name] = max(
np.linalg.norm(
[
# TPR diff
groupwise_metrics[group_metric_name("tpr", group_a)]
- groupwise_metrics[group_metric_name("tpr", group_b)],
# FPR diff
groupwise_metrics[group_metric_name("fpr", group_a)]
- groupwise_metrics[group_metric_name("fpr", group_b)],
],
ord=norm,
)
for group_a, group_b in product(unique_groups, unique_groups)
if group_a < group_b
)
# Optionally, return group-wise metrics as well
if return_groupwise_metrics:
results.update(groupwise_metrics)
return results
[docs]
def evaluate_predictions(
y_true: np.ndarray,
y_pred_scores: np.ndarray,
sensitive_attribute: Optional[np.ndarray] = None,
return_groupwise_metrics: bool = False,
**threshold_target,
) -> dict:
"""Evaluates the given predictions on both performance and fairness metrics.
Will only evaluate fairness if `sensitive_attribute` is provided.
Note
----
The value of `log_loss` may be inaccurate when using `scikit-learn<1.2`.
Parameters
----------
y_true : np.ndarray
The true labels.
y_pred_scores : np.ndarray
The predicted scores.
sensitive_attribute : np.ndarray, optional
The sensitive attribute - which protected group each sample belongs to.
If not provided, will not compute fairness metrics.
return_groupwise_metrics : bool
Whether to return groupwise performance metrics (requires providing
`sensitive_attribute`).
Returns
-------
dict
A dictionary of (key, value) -> (metric_name, metric_value).
"""
# Binarize predictions according to the given threshold target
y_pred_binary = compute_binary_predictions(
y_true,
y_pred_scores,
**threshold_target,
)
# Compute global performance metrics
results = evaluate_performance(y_true, y_pred_binary)
# Compute loss metrics
results.update(
{
"squared_loss": mean_squared_error(y_true, y_pred_scores),
"log_loss": log_loss(
y_true, y_pred_scores,
# eps=np.finfo(y_pred_scores.dtype).eps, # NOTE: for sklearn<1.2
# NOTE: this parameterization of `eps` is no longer useful as
# per sklearn 1.2, and will be removed in sklearn 1.5;
),
}
)
# (Optionally) Compute fairness metrics
if sensitive_attribute is not None:
results.update(
evaluate_fairness(
y_true,
y_pred_binary,
sensitive_attribute,
return_groupwise_metrics=return_groupwise_metrics,
)
)
return results
[docs]
def evaluate_predictions_bootstrap(
y_true: np.ndarray,
y_pred_scores: np.ndarray,
sensitive_attribute: np.ndarray,
k: int = 200,
confidence_pct: float = 95,
seed: int = 42,
**threshold_target,
) -> dict:
"""Computes bootstrap estimates of several metrics for the given predictions.
Parameters
----------
y_true : np.ndarray
The true labels.
y_pred_scores : np.ndarray
The score predictions.
sensitive_attribute : np.ndarray
The sensitive attribute data.
k : int, optional
How many bootstrap samples to draw, by default 200.
confidence_pct : float, optional
How large of a confidence interval to use when reporting lower and upper
bounds, by default 95 (i.e., 2.5 to 97.5 percentile of results).
seed : int, optional
The random seed, by default 42.
Returns
-------
dict
A dictionary of results
"""
assert len(y_true) == len(y_pred_scores)
rng = np.random.default_rng(seed=seed)
# Set threshold target if unset
threshold_target.setdefault("threshold", 0.50)
# Draw k bootstrap samples with replacement
results = []
for _ in range(k):
# Indices of current bootstrap sample
indices = rng.choice(len(y_true), replace=True, size=len(y_true))
# Evaluate predictions on this bootstrap sample
results.append(
evaluate_predictions(
y_true=y_true[indices],
y_pred_scores=y_pred_scores[indices],
sensitive_attribute=sensitive_attribute[indices],
**threshold_target,
)
)
# Compute statistics from bootstrapped results
all_metrics = set(results[0].keys())
bt_mean = {}
bt_stdev = {}
bt_percentiles = {}
low_percentile = (100 - confidence_pct) / 2
confidence_percentiles = [low_percentile, 100 - low_percentile]
for m in all_metrics:
metric_values = [r[m] for r in results]
bt_mean[m] = statistics.mean(metric_values)
bt_stdev[m] = statistics.stdev(metric_values)
bt_percentiles[m] = tuple(np.percentile(metric_values, confidence_percentiles))
# Construct DF with results
return join_dictionaries(
*(
{
f"{metric}_mean": bt_mean[metric],
f"{metric}_stdev": bt_stdev[metric],
f"{metric}_low-percentile": bt_percentiles[metric][0],
f"{metric}_high-percentile": bt_percentiles[metric][1],
}
for metric in sorted(bt_mean.keys())
)
)