Source code for error_parity.pareto_curve

"""Utils for computing the fairness-accuracy Pareto frontier of a classifier.

"""

from __future__ import annotations

import os
import copy
import logging
import traceback
from functools import partial
from concurrent.futures import ThreadPoolExecutor

import numpy as np
import pandas as pd

from .threshold_optimizer import RelaxedThresholdOptimizer
from .evaluation import evaluate_predictions, evaluate_predictions_bootstrap
from ._commons import join_dictionaries, get_cost_envelope, arrays_are_equal


DEFAULT_TOLERANCE_TICKS = np.hstack((
    np.arange(0.0, 0.2, 1e-2),      # [0.00, 0.01, 0.02, ..., 0.19]
    np.arange(0.2, 1.0, 1e-1),      # [0.20, 0.30, 0.40, ...]
))


[docs] def fit_and_evaluate_postprocessing( postproc_template: RelaxedThresholdOptimizer, tolerance: float, fit_data: tuple, eval_data: tuple | dict[tuple], seed: int = 42, y_fit_pred_scores: np.ndarray = None, # pre-computed predictions on the fit data bootstrap: bool = True, **bootstrap_kwargs: dict, ) -> dict[str, dict]: """Fit and evaluate a postprocessing intervention on the given predictor. Parameters ---------- postproc_template: RelaxedThresholdOptimizer An object that serves as the template to copy when creating the postprocessing optimizer. tolerance : float The tolerance (or slack) for fairness constraint fulfillment. This value will override the `tolerance` attribute of the `postproc_template` object. fit_data : tuple The data used to fit postprocessing. eval_data : tuple or dict[tuple] The data or sequence of data to evaluate postprocessing on. If a tuple is provided, will call it "eval" data in the returned results dictionary; if a dict is provided, will assume {<key_1>: <data_1>, ...}. seed : int, optional The random seed, by default 42 y_fit_pred_scores : np.ndarray, optional The pre-computed predicted scores for the `fit_data`; if provided, will avoid re-computing these predictions for each function call. bootstrap : bool, optional Whether to use bootstrapping when computing metric results for postprocessing, by default True. bootstrap_kwargs : dict, optional Any extra arguments to pass on to the bootstrapping function, by default None. Returns ------- results : dict[str, dict] A dictionary of results, whose keys are the data type, and values the metric values obtained by postprocessing on that data type. For example: >>> { >>> "validation": {"accuracy": 0.7, "...": "..."}, >>> "test": {"accuracy": 0.65, "...": "..."}, >>> } """ clf = copy.copy(postproc_template) clf.tolerance = tolerance # Unpack data X_fit, y_fit, s_fit = fit_data logging.basicConfig(level=logging.WARNING, force=True) clf.fit(X=X_fit, y=y_fit, group=s_fit, y_scores=y_fit_pred_scores) results = {} # (Theoretical) fit results results["fit-theoretical"] = { "accuracy": 1 - clf.cost(1.0, 1.0), clf.constraint: clf.constraint_violation(), } ALLOWED_ABS_ERROR = 1e-5 assert clf.constraint_violation() <= tolerance + ALLOWED_ABS_ERROR, \ f"Got {clf.constraint_violation()} > {tolerance}" # Map of data_type->data_tuple to evaluate postprocessing on data_to_eval = ( {"fit": fit_data} | (eval_data if isinstance(eval_data, dict) else {"test": eval_data}) ) def _evaluate_on_data(data: tuple): """Helper function to evaluate on the given data tuple.""" X, Y, S = data if bootstrap: # Default kwargs for bootstrapping kwargs = dict( confidence_pct=95, seed=seed, threshold=0.50, ) # Update kwargs with any extra bootstrap kwargs kwargs.update(bootstrap_kwargs) eval_func = partial( evaluate_predictions_bootstrap, **kwargs, ) else: eval_func = partial( evaluate_predictions, threshold=0.50, ) return eval_func( y_true=Y, y_pred_scores=clf.predict(X, group=S), sensitive_attribute=S, ) # Empirical results for data_type, data_tuple in data_to_eval.items(): results[data_type] = _evaluate_on_data(data_tuple) return results
[docs] def compute_postprocessing_curve( model: object, fit_data: tuple, eval_data: tuple | dict[tuple], fairness_constraint: str = "equalized_odds", l_p_norm: int = np.inf, bootstrap: bool = True, tolerance_ticks: list = DEFAULT_TOLERANCE_TICKS, tolerance_tick_step: float = None, predict_method: str = "predict_proba", n_jobs: int = None, **kwargs, ) -> pd.DataFrame: """Computes the fairness and performance of the given classifier after adjusting (postprocessing) for varying levels of fairness tolerance. Parameters ---------- model : object The model to use. fit_data : tuple Data triplet to use to fit postprocessing intervention, (X, Y, S), respectively containing the features, labels, and sensitive attribute. eval_data : tuple or dict[tuple] Data triplet to use to evaluate postprocessing intervention on (same format as `fit_data`), or a dictionary of <data_name>-><data_triplet> containing multiple datasets to evaluate on. fairness_constraint : str, optional The fairness constraint to use , by default "equalized_odds". l_p_norm : int, optional The norm to use when computing the fairness constraint, by default np.inf. Note: only compatible with the "equalized odds" constraint. bootstrap : bool, optional Whether to compute uncertainty estimates via bootstrapping, by default False. tolerance_ticks : list, optional List of constraint tolerances to use when computing adjustment curve. By default will use higher granularity/precision for lower levels of disparity, and lower granularity for higher levels of disparity. Should correspond to a sorted list of values between 0 and 1. Will be ignored if `tolerance_tick_step` is provided. tolerance_tick_step : float, optional Distance between constraint tolerances in the adjustment curve. Will override `tolerance_ticks` if provided! predict_method : str, optional Which method to call to obtain predictions out of the given model. Use `predict_method="__call__"` for a callable predictor, or the default `predict_method="predict_proba"` for a predictor with sklearn interface. n_jobs : int, optional Number of parallel jobs to use, if omitted will use `os.cpu_count()-1`. Returns ------- postproc_results_df : pd.DataFrame A DataFrame containing the results, one row per tolerance tick. """ def callable_predictor(X) -> np.ndarray: preds = getattr(model, predict_method)(X) assert 1 <= len(preds.shape) <= 2, f"Model outputs predictions in shape {preds.shape}" return preds if len(preds.shape) == 1 else preds[:, -1] # Pre-compute predictions on the fit data X_fit, _, _ = fit_data y_fit_pred_scores = callable_predictor(X_fit) postproc_template = RelaxedThresholdOptimizer( predictor=callable_predictor, constraint=fairness_constraint, l_p_norm=l_p_norm, ) def _func_call(tol: float): try: return fit_and_evaluate_postprocessing( postproc_template=postproc_template, tolerance=tol, fit_data=fit_data, eval_data=eval_data, bootstrap=bootstrap, y_fit_pred_scores=y_fit_pred_scores, **kwargs) except Exception as exc: logging.error( f"FAILED `fit_and_evaluate_postprocessing(.)` with `tolerance={tol}`; " f"{''.join(traceback.TracebackException.from_exception(exc).format())}") return {} # return empty dictionary # If n_jobs not provided: use number of CPU cores - 1 if n_jobs is None: n_jobs = max(os.cpu_count() - 1, 1) logging.info(f"Using `n_jobs={n_jobs}` to compute adjustment curve.") from tqdm.auto import tqdm # Use `tolerance_tick_step` kwarg if tolerance_tick_step is not None: tolerances = np.arange(0.0, 1.0, tolerance_tick_step) if ( # > `tolerance_ticks` was provided tolerance_ticks is not None # > and `tolerance_ticks` was set to a non-default value and not arrays_are_equal(tolerance_ticks, DEFAULT_TOLERANCE_TICKS) ): logging.error("Please provide only one of `tolerance_ticks` and `tolerance_tick_step`.") logging.warning("Use of `tolerance_tick_step` overrides the use of `tolerance_ticks`.") # Use `tolerance_ticks` kwarg else: tolerances = tolerance_ticks # Log tolerances used logging.info(f"Computing postprocessing for the following constraint tolerances: {tolerances}.") with ThreadPoolExecutor(max_workers=n_jobs) as executor: func_call_results = list( tqdm( executor.map(_func_call, tolerances), total=len(tolerances), ) ) results = dict(zip(tolerances, func_call_results)) return _parse_postprocessing_curve(results)
def _parse_postprocessing_curve(postproc_curve_dict: dict) -> pd.DataFrame: """Parses the postprocessing curve dictionary results into a pd.DataFrame. Parameters ---------- postproc_curve_dict : dict The result of computing the postprocessing adjustment curve on a model. Returns ------- postproc_results_df : pd.DataFrame A DataFrame containing the results for each tolerance value. """ return pd.DataFrame([ join_dictionaries( { "tolerance": float(tol), }, *[{ f"{metric_name}_{data_type}": metric_value for data_type, results in results_at_tol.items() for metric_name, metric_value in results.items() }] ) for tol, results_at_tol in postproc_curve_dict.items() ])
[docs] def get_envelope_of_postprocessing_frontier( postproc_results_df: pd.DataFrame, perf_col: str = "accuracy_mean_test", disp_col: str = "equalized_odds_diff_mean_test", constant_clf_perf: float = 0.5, constant_clf_disp: float = 0.0, ) -> np.ndarray: """Computes points in envelope of the given postprocessing frontier results. Parameters ---------- postproc_results_df : pd.DataFrame The postprocessing frontier results DF. perf_col : str, optional Name of the column containing performance results, by default "accuracy_mean_test" disp_col : str, optional Name of column containing disparity results, by default "equalized_odds_diff_mean_test" constant_clf_perf : float, optional The performance of a dummy constant classifier (in the same metric as `perf_col`), by default 0.5. constant_clf_disp : float, optional The disparity of a dummy constant classifier (in the same metric as `disp_col`), by default 0.0; assumes a constant classifier fulfills fairness! Returns ------- np.ndarray A 2-D array containing the points in the convex hull of the Pareto curve. """ # Add bottom left point (postprocessing to constant classifier is always trivial) postproc_results_df = pd.concat( objs=( pd.DataFrame( { perf_col: [constant_clf_perf], disp_col: [constant_clf_disp], }, ), postproc_results_df, ), ignore_index=True, ) # Make costs array costs = np.stack( ( 1 - postproc_results_df[perf_col], postproc_results_df[disp_col], ), axis=1, ) # Get points in the envelope of the Pareto frontier costs_envelope = get_cost_envelope(costs) # Get original metric values back adjustment_frontier = np.stack( ( 1 - costs_envelope[:, 0], # flip perf values back to original metric costs_envelope[:, 1], # keep disparity values (were already costs) ), axis=1, ) # Sort by x-axis to plot properly (should already be sorted but, just making sure...) adjustment_frontier = adjustment_frontier[np.argsort(adjustment_frontier[:, 0])] return adjustment_frontier
[docs] def compute_inner_and_outer_adjustment_ci( postproc_results_df, perf_metric: str, disp_metric: str, data_type: str = "test", # by default, fetch results on test data constant_clf_perf: float = None, ) -> tuple: """Computes the interior/inner and exterior/outer adjustment curves, corresponding to the confidence intervals (by default 95% c.i.). Returns ------- postproc_results_df : tuple[np.array, np.array, np.array] A tuple containing (xticks, inner_yticks, outer_yticks). """ # Make INTERIOR/UPPER envelope of the adjustment frontier # (i.e., the WORST points, with lower performance and higher disparity) interior_adjusted_df = postproc_results_df.copy() interior_adjusted_df[perf_metric] = \ interior_adjusted_df[f"{perf_metric}_low-percentile_{data_type}"] interior_adjusted_df[disp_metric] = \ interior_adjusted_df[f"{disp_metric}_high-percentile_{data_type}"] # Make OUTER/BOTTOM envelope of the adjustment frontier # (i.e., the BEST points, with higher performance and lower disparity) outer_adjusted_df = postproc_results_df.copy() outer_adjusted_df[perf_metric] = \ outer_adjusted_df[f"{perf_metric}_high-percentile_{data_type}"] outer_adjusted_df[disp_metric] = \ outer_adjusted_df[f"{disp_metric}_low-percentile_{data_type}"] # Process each frontier interior_adj_frontier = get_envelope_of_postprocessing_frontier( interior_adjusted_df, perf_col=perf_metric, disp_col=disp_metric, constant_clf_perf=constant_clf_perf, ) outer_adj_frontier = get_envelope_of_postprocessing_frontier( outer_adjusted_df, perf_col=perf_metric, disp_col=disp_metric, constant_clf_perf=constant_clf_perf, ) # Create functions that interpolate points within each frontier (interior or outer) # Because ax.fill_between requires both lines to have the same xticks from scipy.interpolate import interp1d interior_adj_func = interp1d( x=interior_adj_frontier[:, 0], y=interior_adj_frontier[:, 1], bounds_error=False, fill_value=( np.min(interior_adj_frontier[:, 1]), np.max(interior_adj_frontier[:, 1]), ), ) outer_adj_func = interp1d( x=outer_adj_frontier[:, 0], y=outer_adj_frontier[:, 1], bounds_error=False, fill_value=( np.min(outer_adj_frontier[:, 1]), np.max(outer_adj_frontier[:, 1]), ), ) # Get common xticks (union) adjustment_frontier_xticks = np.sort(np.unique(np.hstack( (interior_adj_frontier[:, 0], outer_adj_frontier[:, 0]) ))) interior_frontier_yticks = np.array([interior_adj_func(x) for x in adjustment_frontier_xticks]) outer_frontier_yticks = np.array([outer_adj_func(x) for x in adjustment_frontier_xticks]) return adjustment_frontier_xticks, interior_frontier_yticks, outer_frontier_yticks