import os
import pickle

import numpy as np
from scipy import stats

from .utils import check_args, import_plt, call_shortcut

__all__ = ["SignTest", "SignedRankTest", "HierarchicalTest", "two_on_multiple"]

[docs]class Posterior: """ Sampled posterior distribution Args: sample (np.array): a 3 x `nsamples` array names (tuple of str or None): names of learning algorithms (default: `None`) """ def __init__(self, sample, *, names=None): self.sample = sample self.names = names
[docs] def probs(self, with_rope=True): """ Compute and return probabilities Args: with_rope (bool): tells whether the sample includes the probabilities for the rope region (default: `True`) Returns: `(p_left, p_rope, p_right)` if `with_rope=True`; otherwise `(p_left, p_right)`. """ winners = np.argmax(self.sample, axis=1) pl, pe, pr = np.bincount(winners, minlength=3) / len(winners) return (pl, pe, pr) if with_rope else (pl / (pl + pr), pr / (pl + pr))
[docs] def plot(self, names=None): """ Plot the posterior distribution. If there are samples in which the probability of `rope` is higher than 0.1, the distribution is shown in a simplex (see :obj:`plot_simplex`), otherwise as a histogram (:obj:`plot_histogram`). Args: names (tuple of str or `None`): names of classifiers Returns: matplotlib figure """ if np.max(self.sample[:, 1]) < 0.1: return self.plot_histogram(names) else: return self.plot_simplex(names)
[docs] def plot_simplex(self, names=None): """ Plot the posterior distribution in a simplex. The distribution is shown as a triangle with regions corresponding to first classifier having higher scores than the other by more than rope, the second having higher scores, or the difference being within the rope. Args: names (tuple of str): names of classifiers Returns: matplotlib figure """ plt = import_plt() from matplotlib.lines import Line2D def project(points): from math import sqrt, sin, cos, pi p1, p2, p3 = points.T / sqrt(3) x = (p2 - p1) * cos(pi / 6) + 0.5 y = p3 - (p1 + p2) * sin(pi / 6) + 1 / (2 * sqrt(3)) return np.vstack((x, y)).T fig, ax = plt.subplots() ax.set_aspect('equal', 'box') # triangle ax.add_line(Line2D([0, 0.5, 1.0, 0], [0, np.sqrt(3) / 2, 0, 0], color='orange')) names = names or self.names or ("C1", "C2") pl, pe, pr = self.probs() ax.text(0, -0.04, 'p({}) = {:.3f}'.format(names[0], pl), horizontalalignment='center', verticalalignment='top') ax.text(0.5, np.sqrt(3) / 2, 'p(rope) = {:.3f}'.format(pe), horizontalalignment='center', verticalalignment='bottom') ax.text(1, -0.04, 'p({}) = {:.3f}'.format(names[1], pr), horizontalalignment='center', verticalalignment='top') cx, cy = project(np.array([[0.3333, 0.3333, 0.3333]]))[0] for x, y in project(np.array([[.5, .5, 0], [.5, 0, .5], [0, .5, .5]])): ax.add_line(Line2D([cx, x], [cy, y], color='orange')) # project and draw points tripts = project(self.sample[:, [0, 2, 1]]) plt.hexbin(tripts[:, 0], tripts[:, 1], mincnt=1, # Leave some padding around the triangle for vertex labels ax.set_xlim(-0.2, 1.2) ax.set_ylim(-0.2, 1.2) ax.axis('off') return fig
[docs] def plot_histogram(self, names): """ Plot the posterior distribution as histogram. Args: names (tuple of str): names of classifiers Returns: matplotlib figure """ plt = import_plt() names = names or self.names or ("C1", "C2") points = self.sample[:, 2] pr = (np.sum(points > 0.5) + 0.5 * np.sum(points == 0.5)) / len(points) pl = 1 - pr fig, ax = plt.subplots() ax.grid(True) ax.hist(points, 50, color="#34ccff") ax.axis(xmin=0, xmax=1) ax.text(0, 0, "\n\np({}) = {:.3f}".format(names[0], pl), horizontalalignment='left', verticalalignment='top') ax.text(1, 0, "\n\np({}) = {:.3f}".format(names[1], pr), horizontalalignment='right', verticalalignment='top') ax.get_yaxis().set_ticklabels([]) ax.axvline(x=0.5, color="#ffad2f", linewidth=2) return fig
[docs]class Test: LEFT, ROPE, RIGHT = range(3) def __new__(cls, x, y, rope=0, *, nsamples=50000, **kwargs): return Posterior(cls.sample(x, y, rope, nsamples=nsamples, **kwargs))
[docs] @classmethod def sample(cls, x, y, rope=0, nsamples=50000, **kwargs): """ Compute a sample of posterior distribution. Derived classes override this method to implement specific sampling methods. Derived methods may have additional arguments. Args: x (np.array): a vector of scores for the first model y (np.array): a vector of scores for the second model rope (float): the width of the region of practical equivalence (default: 0) nsamples (int): the number of samples (default: 50000) Returns: np.array of shape (`nsamples`, 3) """ pass
[docs] @classmethod def probs(cls, x, y, rope=0, *, nsamples=50000, **kwargs): """ Compute and return probabilities Args: x (np.array): a vector of scores for the first model y (np.array): a vector of scores for the second model rope (float): the width of the region of practical equivalence (default: 0) nsamples (int): the number of samples (default: 50000) Returns: `(p_left, p_rope, p_right)` if `rope > 0`; otherwise `(p_left, p_right)`. """ # new returns an instance of Posterior, not Test # pylint: disable=no-value-for-parameter return cls(x, y, rope, nsamples=nsamples, **kwargs).probs(rope > 0)
[docs] @classmethod def plot(cls, x, y, rope, *, nsamples=50000, names=None, **kwargs): """ Plot the posterior distribution. If there are samples in which the probability of `rope` is higher than 0.1, the distribution is shown in a simplex (see :obj:`plot_simplex`), otherwise as a histogram (:obj:`plot_histogram`). Args: x (np.array): a vector of scores for the first model y (np.array): a vector of scores for the second model rope (float): the width of the region of practical equivalence (default: 0) nsamples (int): the number of samples (default: 50000) names (tuple of str): names of classifiers Returns: matplotlib figure """ # pylint: disable=no-value-for-parameter return cls(x, y, rope, nsamples=nsamples, **kwargs).plot(names)
[docs] @classmethod def plot_simplex(cls, x, y, rope, *, nsamples=50000, names=None, **kwargs): """ Plot the posterior distribution in a simplex. The distribution is shown as a triangle with regions corresponding to first classifier having higher scores than the other by more than rope, the second having higher scores, or the difference being within the rope. Args: x (np.array): a vector of scores for the first model y (np.array): a vector of scores for the second model nsamples (int): the number of samples (default: 50000) names (tuple of str): names of classifiers Returns: matplotlib figure """ # pylint: disable=no-value-for-parameter return cls(x, y, rope, nsamples=nsamples, **kwargs).plot_simplex(names)
[docs] @classmethod def plot_histogram(cls, x, y, *, nsamples=50000, names=None, **kwargs): """ Plot the posterior distribution as histogram. Args: x (np.array): a vector of scores for the first model y (np.array): a vector of scores for the second model nsamples (int): the number of samples (default: 50000) names (tuple of str): names of classifiers Returns: matplotlib figure """ # pylint: disable=no-value-for-parameter return cls(x, y, rope=0, nsamples=nsamples, **kwargs)\ .plot_histogram(names)
[docs]class SignTest(Test): """ Compute a Bayesian sign test (`A Bayesian Wilcoxon signed-rank test based on the Dirichlet process <>`_, A. Benavoli et al, ICML 2014). Argument `prior` can give a strength (as `float`) of a prior put on the rope region, or a tuple with prior's position and strength, for instance `(SignTest.LEFT, 1.0)`. Position can be `SignTest.LEFT`, `SignTest.ROPE` or `SignTest.RIGHT`. """ @classmethod # pylint: disable=arguments-differ def sample(cls, x, y, rope=0, *, prior=1, nsamples=50000): if isinstance(prior, tuple): prior, prior_place = prior else: prior_place = cls.ROPE check_args(x, y, rope, prior, nsamples) diff = y - x nleft = sum(diff < -rope) nright = sum(diff > rope) nrope = len(diff) - nleft - nright alpha = np.array([nleft, nrope, nright], dtype=float) alpha += 0.0001 # for numerical stability alpha[prior_place] += prior return np.random.dirichlet(alpha, nsamples)
[docs]class SignedRankTest(Test): """ Compute a Bayesian signed-rank test (`A Bayesian Wilcoxon signed-rank test based on the Dirichlet process <>`_, A. Benavoli et al, ICML 2014). Arguments `x` and `y` should be one-dimensional arrays with average performances across data sets. These can be obtained using any sampling method, not necessarily cross validation. """ @classmethod # pylint: disable=arguments-differ def sample(cls, x, y, rope=0, *, prior=0.5, nsamples=50000): def heaviside(a, thresh): return (a > thresh).astype(float) + 0.5 * (a == thresh).astype(float) def diff_sums(x, y): diff = np.hstack(([0], y - x)) diff_m = np.lib.stride_tricks.as_strided( diff, strides=diff.strides + (0,), shape=diff.shape * 2) weights = np.ones(len(diff)) weights[0] = prior return diff_m + diff_m.T, weights def with_rope(): sums, weights = diff_sums(x, y) above_rope = heaviside(sums, 2 * rope) below_rope = heaviside(-sums, 2 * rope) samples = np.zeros((nsamples, 3)) for i, samp_weights in enumerate(np.random.dirichlet(weights, nsamples)): prod_weights = np.outer(samp_weights, samp_weights) samples[i, 0] = np.sum(prod_weights * below_rope) samples[i, 2] = np.sum(prod_weights * above_rope) samples[:, 1] = -samples[:, 0] - samples[:, 2] + 1 return samples def without_rope(): sums, weights = diff_sums(x, y) above_0 = heaviside(sums, 0) samples = np.zeros((nsamples, 3)) for i, samp_weights in enumerate(np.random.dirichlet(weights, nsamples)): prod_weights = np.outer(samp_weights, samp_weights) samples[i, 2] = np.sum(prod_weights * above_0) samples[:, 0] = -samples[:, 2] + 1 return samples check_args(x, y, rope, nsamples) return with_rope() if rope > 0 else without_rope()
[docs]class HierarchicalTest(Test): """ Compute a hierarchical t test. (`Statistical comparison of classifiers through Bayesian hierarchical modelling <>`_, G. Corani et al, Machine Learning, 2017). Arguments `x` and `y` should be two-dimensional arrays; rows correspond to data sets and elements within rows are scores obtained by (possibly repeated) cross-validation(s). """ @classmethod # pylint: disable=arguments-differ # pylint: disable=unused-argument def sample(cls, x, y, rope, *, runs=1, lower_alpha=1, upper_alpha=2, lower_beta=0.01, upper_beta=0.1, upper_sigma=1000, chains=4, nsamples=None): try: import pystan except ImportError: raise ImportError("Hierarchical model requires 'pystan'; " "install it by 'pip install pystan'") LAST_SAMPLE_PICKLE = "last-sample.pickle" STAN_MODEL_PICKLE = "stored-stan-model.pickle" stan_file = os.path.join(os.path.split(__file__)[0], "hierarchical-t-test.stan") args_signature = ( hash(tuple(tuple(e for e in row) for row in y - x)), runs, lower_alpha, upper_alpha, lower_beta, upper_beta, upper_sigma) def try_unpickle(fname): if os.path.exists(fname) \ and os.path.getmtime(fname) > os.path.getmtime(stan_file): with open(fname, "rb") as f: return pickle.load(f) return None def try_pickle(obj, fname): try: with open(fname, "wb") as f: pickle.dump(obj, f) except PermissionError: pass def scaled_data(x, y, rope): # ensure homogenous scale across all data seta diff = y - x stds = np.std(diff, axis=1) std_diff = np.mean(stds) return rope / std_diff, diff / std_diff def prepare_stan_data(diff): ndatasets, nscores = diff.shape nfolds = nscores / runs # avoid numerical problems with zero variance nscores_2 = nscores // 2 for sample in diff: if np.var(sample) == 0: sample[:nscores_2] = np.random.uniform(-rope, rope, nscores_2) sample[nscores_2:] = -sample[:nscores_2] std_within = np.mean(np.std(diff, axis=1)) # may be different from std_diff! std_among = np.std(np.mean(diff, axis=1)) if ndatasets > 1 else std_within maxdiff = np.max(np.abs(diff)) return dict( x=diff, Nsamples=nscores, q=ndatasets, rho=1 / nfolds, deltaLow=-maxdiff, deltaHi=maxdiff, lowerAlpha=lower_alpha, upperAlpha=upper_alpha, lowerBeta=lower_beta, upperBeta=upper_beta, stdLow=0, stdHi=std_within * upper_sigma, std0Low=0, std0Hi=std_among * upper_sigma ) def get_stan_model(model_file): stan_model = try_unpickle(STAN_MODEL_PICKLE) if stan_model is None: model_code = open(model_file).read() stan_model = pystan.StanModel(model_code=model_code) try_pickle(stan_model, STAN_MODEL_PICKLE) return stan_model def run_stan(diff): stan_data = prepare_stan_data(diff) # check if the last pickled result can be reused cached = try_unpickle(LAST_SAMPLE_PICKLE) if cached is not None and cached[0] == args_signature: return cached[1] model = get_stan_model(stan_file) fit = model.sampling(data=stan_data, chains=chains) results = fit.extract(permuted=True) mu = results["delta0"] stdh = results["std0"] nu = results["nu"] try_pickle((args_signature, (mu, stdh, nu)), LAST_SAMPLE_PICKLE) return mu, stdh, nu rope, diff = scaled_data(x, y, rope) mu, stdh, nu = run_stan(diff) samples = np.empty((len(nu), 3)) for mui, std, df, sample_row in zip(mu, stdh, nu, samples): sample_row[2] = 1 - stats.t.cdf(rope, df, mui, std) sample_row[0] = stats.t.cdf(-rope, df, mui, std) sample_row[1] = 1 - sample_row[0] - sample_row[2] return samples
[docs]def two_on_multiple(x, y, rope=0, *, runs=1, names=None, plot=False, **kwargs): """ Compute probabilities using a Bayesian signed-ranks test (if `x` and `y` or one-dimensional) or a hierarchical (if they are two-dimensions), and, optionally, draw a histogram. The hierarchical test assumes that the classifiers were evaluated using cross validation; argument `runs` gives the number of repetitions of cross-validation. For more details, see :obj:`SignRankTest` and :obj:`HierarchicalTest` Args: x (np.array): a vector of scores for the first model y (np.array): a vector of scores for the second model rope (float): the width of the region of practical equivalence (default: 0) runs (int): the number of repetitions of cross validation (for hierarhical model) (default: 1) nsamples (int): the number of samples (default: 50000) plot (bool): if `True`, the function also return a histogram (default: False) names (tuple of str): names of classifiers (ignored if `plot` is `False`) Returns: `(p_left, p_rope, p_right)` if `rope > 0`; otherwise `(p_left, p_right)`. If `plot=True`, the function also returns a matplotlib figure, that is, `((p_left, p_rope, p_right), fig)` """ if x.ndim == 2: test = HierarchicalTest kwargs["runs"] = runs else: test = SignedRankTest return call_shortcut(test, x, y, rope, names=names, plot=plot, **kwargs)