Source code for simdec.decomposition

from __future__ import annotations

from dataclasses import dataclass
from hashlib import blake2b
from typing import Literal

import numpy as np
import pandas as pd
from scipy import stats


__all__ = ["decomposition", "states_expansion"]


[docs] def states_expansion(states: list[int], inputs: pd.DataFrame) -> list[list[str]]: """Expand states list to fully represent all scenarios.""" inputs = pd.DataFrame(inputs) expanded_states = [] for state in states: if isinstance(state, int): if state == 2: expanded_states.append(["low", "high"]) elif state == 3: expanded_states.append(["low", "medium", "high"]) else: expanded_states.append([i for i in range(state)]) else: expanded_states.append(state) # categorical for a given variable cat_cols = inputs.select_dtypes(exclude=["number"]) cat_cols_idx = [] states_cats_ = [] for cat_col in cat_cols: _, cats = pd.factorize(inputs[cat_col]) cat_cols_idx.append(inputs.columns.get_loc(cat_col)) states_cats_.append(cats) for i, states_cat_ in zip(cat_cols_idx, states_cats_): n_unique = np.unique(inputs.iloc[:, i]).size expanded_states[i] = list(states_cat_) if n_unique < 5 else expanded_states[i] return expanded_states
@dataclass class DecompositionResult: var_names: list[str] statistic: np.ndarray bins: pd.DataFrame states: list[int] bin_edges: np.ndarray def __reduce__(self): h = blake2b(key=b"result hashing", digest_size=20) h.update(str(self.var_names).encode()) h.update(str(self.statistic).encode()) h.update(str(self.bins).encode()) h.update(str(self.states).encode()) h.update(str(self.bin_edges).encode()) return [h.hexdigest()]
[docs] def decomposition( inputs: pd.DataFrame, output: pd.DataFrame, *, sensitivity_indices: np.ndarray, dec_limit: float | None = None, auto_ordering: bool = True, states: list[int] | None = None, statistic: Literal["mean", "median"] | None = "mean", ) -> DecompositionResult: """SimDec decomposition. Parameters ---------- inputs : DataFrame of shape (n_runs, n_factors) Input variables. output : DataFrame of shape (n_runs, 1) or (n_runs,) Target variable. sensitivity_indices : ndarray of shape (n_factors, 1) Sensitivity indices, combined effect of each input. dec_limit : float Explained variance ratio to filter the number input variables. auto_ordering : bool Automatically order input columns based on the relative sensitivity_indices or use the provided order. states : list of int, optional List of possible states for the considered parameter. statistic : {"mean", "median"}, optional Statistic to compute in each bin. Returns ------- res : DecompositionResult An object with attributes: var_names : list of string (n_factors, 1) Variable names. statistic : ndarray of shape (n_factors, 1) Statistic in each bin. bins : DataFrame Multidimensional bins. states : list of int List of possible states for the considered parameter. """ var_names = inputs.columns cat_cols = inputs.select_dtypes(exclude=["number"]) for cat_col in cat_cols: codes, cat_states_ = pd.factorize(inputs[cat_col]) inputs[cat_col] = codes inputs = inputs.to_numpy() output = output.to_numpy().flatten() # 1. variables for decomposition var_order = np.argsort(sensitivity_indices)[::-1] # only keep the explained variance corresponding to `dec_limit` sensitivity_indices = sensitivity_indices[var_order] if auto_ordering: # handle edge case where sensitivity indices don't sum exactly to 1.0 if dec_limit is None: dec_limit = 0.8 * np.sum(sensitivity_indices) cumulative_sum = np.cumsum(sensitivity_indices) indices_over_limit = np.where(cumulative_sum >= dec_limit)[0] if indices_over_limit.size > 0: n_var_dec = indices_over_limit[0] + 1 else: n_var_dec = sensitivity_indices.size n_var_dec = max(1, n_var_dec) # keep at least one variable n_var_dec = min(5, n_var_dec) # use at most 5 variables else: n_var_dec = inputs.shape[1] # 2. variable selection and reordering if auto_ordering: var_names = var_names[var_order[:n_var_dec]].tolist() inputs = inputs[:, var_order[:n_var_dec]] else: var_names = var_names[:n_var_dec].tolist() inputs = inputs[:, :n_var_dec] # 3. states formation (after reordering/selection) if states is None: states = 3 if n_var_dec <= 2 else 2 states = [states] * n_var_dec for i in range(n_var_dec): n_unique = np.unique(inputs[:, i]).size states[i] = n_unique if n_unique <= 5 else states[i] # 4. decomposition bins = [] statistic_methods = { "mean": np.mean, "median": np.median, } try: statistic_method = statistic_methods[statistic] except KeyError: msg = f"'statistic' must be one of {statistic_methods.keys()}" raise ValueError(msg) def statistic_(inputs): """Custom function to keep track of the content of bins.""" bins.append(inputs) return statistic_method(inputs) # make bins with equal number of samples for a given dimension # sort and then split in n-state sorted_inputs = np.sort(inputs, axis=0) bin_edges = [] for i, states_ in enumerate(states): col = inputs[:, i] uniq = np.unique(col) # Categorical-like numeric inputs: if we have few unique numeric values, # build edges around the unique values so we don't create empty states. # We only apply this when the requested number of states matches the # number of categories (uniq.size). if uniq.size <= 5 and states_ == uniq.size: uniq = np.sort(uniq).astype(float) if uniq.size == 1: edges = np.array([uniq[0] - 0.5, uniq[0] + 0.5], dtype=float) else: gaps = np.diff(uniq) margin = 0.1 * np.min(gaps) edges = np.concatenate( ([uniq[0] - margin], uniq[:-1] + margin, [uniq[-1] + margin]) ).astype(float) bin_edges.append(edges) continue # Default: equal-number-of-samples bins splits = np.array_split(sorted_inputs[:, i], states_) edges = [s[0] for s in splits] edges.append(splits[-1][-1]) # last point to close the edges edges = np.array(edges, dtype=float) edges += 1e-10 * np.linspace(0, 1, len(edges)) bin_edges.append(edges) res = stats.binned_statistic_dd( inputs, values=output, statistic=statistic_, bins=bin_edges ) bins = pd.DataFrame(bins[1:]).T if len(bins.columns) != np.prod(states): # mismatch with the number of states vs bins # when it happens, we have NaNs in the statistic # we can add empty columns with NaNs on these positions as bins # then are not present for these states nan_idx = np.argwhere(np.isnan(res.statistic).flatten()).flatten() for idx in nan_idx: bins = np.insert(bins, idx, np.nan, axis=1) bins = pd.DataFrame(bins) return DecompositionResult( var_names=var_names, statistic=res.statistic, bins=bins, states=states, bin_edges=res.bin_edges, )