Source code for mars.learn.metrics._ranking

# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import warnings

import numpy as np

from ... import tensor as mt
from ..utils.checks import assert_all_finite
from ..utils.multiclass import type_of_target
from ..utils.validation import check_consistent_length, column_or_1d


[docs]def auc(x, y, session=None, run_kwargs=None): """Compute Area Under the Curve (AUC) using the trapezoidal rule This is a general function, given points on a curve. For computing the area under the ROC-curve, see :func:`roc_auc_score`. For an alternative way to summarize a precision-recall curve, see :func:`average_precision_score`. Parameters ---------- x : tensor, shape = [n] x coordinates. These must be either monotonic increasing or monotonic decreasing. y : tensor, shape = [n] y coordinates. Returns ------- auc : tensor, with float value Examples -------- >>> import mars.tensor as mt >>> from mars.learn import metrics >>> y = mt.array([1, 1, 2, 2]) >>> pred = mt.array([0.1, 0.4, 0.35, 0.8]) >>> fpr, tpr, thresholds = metrics.roc_curve(y, pred, pos_label=2) >>> metrics.auc(fpr, tpr) 0.75 See also -------- roc_auc_score : Compute the area under the ROC curve average_precision_score : Compute average precision from prediction scores precision_recall_curve : Compute precision-recall pairs for different probability thresholds """ check_consistent_length(x, y) x = column_or_1d(x) y = column_or_1d(y) if x.shape[0] < 2: raise ValueError( "At least 2 points are needed to compute" f" area under curve, but x.shape = {x.shape}" ) direction = 1 dx = mt.diff(x) any_dx_lt_0 = mt.any(dx < 0) all_dx_le_0 = mt.all(dx <= 0) mt.ExecutableTuple([x, any_dx_lt_0, all_dx_le_0]).execute( session=session, **(run_kwargs or dict()) ) if any_dx_lt_0.fetch(session=session): if all_dx_le_0.fetch(session=session): direction = -1 else: x_data = x.fetch(session=session) raise ValueError(f"x is neither increasing nor decreasing : {x_data}.") area = direction * mt.trapz(y, x) return area.execute(session=session, **(run_kwargs or dict()))
def _binary_clf_curve( y_true, y_score, pos_label=None, sample_weight=None, session=None, run_kwargs=None ): """Calculate true and false positives per binary classification threshold. Parameters ---------- y_true : tensor, shape = [n_samples] True targets of binary classification y_score : tensor, shape = [n_samples] Estimated probabilities or decision function pos_label : int or str, default=None The label of the positive class sample_weight : array-like of shape (n_samples,), default=None Sample weights. Returns ------- fps : tensor, shape = [n_thresholds] A count of false positives, at index i being the number of negative samples assigned a score >= thresholds[i]. The total number of negative samples is equal to fps[-1] (thus true negatives are given by fps[-1] - fps). tps : tensor, shape = [n_thresholds <= len(mt.unique(y_score))] An increasing count of true positives, at index i being the number of positive samples assigned a score >= thresholds[i]. The total number of positive samples is equal to tps[-1] (thus false negatives are given by tps[-1] - tps). thresholds : tensor, shape = [n_thresholds] Decreasing score values. """ y_type = type_of_target(y_true).to_numpy(session=session, **(run_kwargs or dict())) if not (y_type == "binary" or (y_type == "multiclass" and pos_label is not None)): raise ValueError(f"{y_type} format is not supported") check_consistent_length(y_true, y_score, sample_weight) y_true = column_or_1d(y_true) y_score = column_or_1d(y_score) y_true = assert_all_finite(y_true, check_only=False) y_score = assert_all_finite(y_score, check_only=False) if sample_weight is not None: sample_weight = column_or_1d(sample_weight) # ensure binary classification if pos_label is not specified # classes.dtype.kind in ('O', 'U', 'S') is required to avoid # triggering a FutureWarning by calling np.array_equal(a, b) # when elements in the two arrays are not comparable. classes = mt.unique(y_true, aggregate_size=1).to_numpy( session=session, **(run_kwargs or dict()) ) if pos_label is None and ( classes.dtype.kind in ("O", "U", "S") or not ( np.array_equal(classes, [0, 1]) or np.array_equal(classes, [-1, 1]) or np.array_equal(classes, [0]) or np.array_equal(classes, [-1]) or np.array_equal(classes, [1]) ) ): classes_repr = ", ".join(repr(c) for c in classes) raise ValueError( f"y_true takes value in {{{classes_repr}}} and " "pos_label is not specified: either make y_true " "take value in {{0, 1}} or {{-1, 1}} or " "pass pos_label explicitly." ) elif pos_label is None: pos_label = 1.0 # make y_true a boolean vector y_true = y_true == pos_label # sort scores and corresponding truth values desc_score_indices = mt.argsort(y_score, kind="mergesort")[::-1] y_score = y_score[desc_score_indices] y_true = y_true[desc_score_indices] if sample_weight is not None: weight = sample_weight[desc_score_indices] else: weight = 1.0 # y_score typically has many tied values. Here we extract # the indices associated with the distinct values. We also # concatenate a value for the end of the curve. distinct_value_indices = mt.where(mt.diff(y_score))[0] threshold_idxs = mt.r_[distinct_value_indices, y_true.size - 1] # accumulate the true positives with decreasing threshold tps = (y_true * weight).cumsum()[threshold_idxs] if sample_weight is not None: # express fps as a cumsum to ensure fps is increasing even in # the presence of floating point errors fps = ((1 - y_true) * weight).cumsum()[threshold_idxs] else: fps = 1 + threshold_idxs - tps ret = mt.ExecutableTuple([fps, tps, y_score[threshold_idxs]]) return ret.execute(session=session, **(run_kwargs or dict())) def _binary_roc_auc_score( y_true, y_score, sample_weight=None, max_fpr=None, session=None, run_kwargs=None ): """Binary roc auc score.""" from numpy import interp if len(mt.unique(y_true).execute()) != 2: raise ValueError( "Only one class present in y_true. ROC AUC score " "is not defined in that case." ) fpr, tpr, _ = roc_curve( y_true, y_score, sample_weight=sample_weight, session=session, run_kwargs=run_kwargs, ) fpr, tpr = mt.ExecutableTuple([fpr, tpr]).fetch(session=session) if max_fpr is None or max_fpr == 1: return auc(fpr, tpr, session=session, run_kwargs=run_kwargs).fetch( session=session ) if max_fpr <= 0 or max_fpr > 1: raise ValueError(f"Expected max_fpr in range (0, 1], got: {max_fpr}") # Add a single point at max_fpr by linear interpolation stop = ( mt.searchsorted(fpr, max_fpr, "right") .execute(session=session, **(run_kwargs or dict())) .fetch(session=session) ) x_interp = [fpr[stop - 1], fpr[stop]] y_interp = [tpr[stop - 1], tpr[stop]] tpr = list(tpr[:stop]) tpr.append(interp(max_fpr, x_interp, y_interp)) fpr = list(fpr[:stop]) fpr.append(max_fpr) partial_auc = auc(fpr, tpr, session=session, run_kwargs=run_kwargs) # McClish correction: standardize result to be 0.5 if non-discriminant # and 1 if maximal min_area = 0.5 * max_fpr ** 2 max_area = max_fpr return 0.5 * ( 1 + (partial_auc.fetch(session=session) - min_area) / (max_area - min_area) )
[docs]def roc_curve( y_true, y_score, pos_label=None, sample_weight=None, drop_intermediate=True, session=None, run_kwargs=None, ): """Compute Receiver operating characteristic (ROC) Note: this implementation is restricted to the binary classification task. Read more in the :ref:`User Guide <roc_metrics>`. Parameters ---------- y_true : tensor, shape = [n_samples] True binary labels. If labels are not either {-1, 1} or {0, 1}, then pos_label should be explicitly given. y_score : tensor, shape = [n_samples] Target scores, can either be probability estimates of the positive class, confidence values, or non-thresholded measure of decisions (as returned by "decision_function" on some classifiers). pos_label : int or str, default=None The label of the positive class. When ``pos_label=None``, if y_true is in {-1, 1} or {0, 1}, ``pos_label`` is set to 1, otherwise an error will be raised. sample_weight : array-like of shape (n_samples,), default=None Sample weights. drop_intermediate : boolean, optional (default=True) Whether to drop some suboptimal thresholds which would not appear on a plotted ROC curve. This is useful in order to create lighter ROC curves. .. versionadded:: 0.17 parameter *drop_intermediate*. Returns ------- fpr : tensor, shape = [>2] Increasing false positive rates such that element i is the false positive rate of predictions with score >= thresholds[i]. tpr : tensor, shape = [>2] Increasing true positive rates such that element i is the true positive rate of predictions with score >= thresholds[i]. thresholds : tensor, shape = [n_thresholds] Decreasing thresholds on the decision function used to compute fpr and tpr. `thresholds[0]` represents no instances being predicted and is arbitrarily set to `max(y_score) + 1`. See also -------- roc_auc_score : Compute the area under the ROC curve Notes ----- Since the thresholds are sorted from low to high values, they are reversed upon returning them to ensure they correspond to both ``fpr`` and ``tpr``, which are sorted in reversed order during their calculation. References ---------- .. [1] `Wikipedia entry for the Receiver operating characteristic <https://en.wikipedia.org/wiki/Receiver_operating_characteristic>`_ .. [2] Fawcett T. An introduction to ROC analysis[J]. Pattern Recognition Letters, 2006, 27(8):861-874. Examples -------- >>> import mars.tensor as mt >>> from mars.learn import metrics >>> y = mt.array([1, 1, 2, 2]) >>> scores = mt.array([0.1, 0.4, 0.35, 0.8]) >>> fpr, tpr, thresholds = metrics.roc_curve(y, scores, pos_label=2) >>> fpr array([0. , 0. , 0.5, 0.5, 1. ]) >>> tpr array([0. , 0.5, 0.5, 1. , 1. ]) >>> thresholds array([1.8 , 0.8 , 0.4 , 0.35, 0.1 ]) """ from sklearn.exceptions import UndefinedMetricWarning fps, tps, thresholds = _binary_clf_curve( y_true, y_score, pos_label=pos_label, sample_weight=sample_weight, session=session, run_kwargs=run_kwargs, ) # Attempt to drop thresholds corresponding to points in between and # collinear with other points. These are always suboptimal and do not # appear on a plotted ROC curve (and thus do not affect the AUC). # Here mt.diff(_, 2) is used as a "second derivative" to tell if there # is a corner at the point. Both fps and tps must be tested to handle # thresholds with multiple data points (which are combined in # _binary_clf_curve). This keeps all cases where the point should be kept, # but does not drop more complicated cases like fps = [1, 3, 7], # tps = [1, 2, 4]; there is no harm in keeping too many thresholds. if drop_intermediate and len(fps) > 2: optimal_idxs = mt.where( mt.r_[True, mt.logical_or(mt.diff(fps, 2), mt.diff(tps, 2)), True] )[0] fps = fps[optimal_idxs] tps = tps[optimal_idxs] thresholds = thresholds[optimal_idxs] # Add an extra threshold position # to make sure that the curve starts at (0, 0) tps = mt.r_[0, tps] fps = mt.r_[0, fps] thresholds = mt.r_[thresholds[0] + 1, thresholds] last_fps = fps[-1] last_tps = tps[-1] mt.ExecutableTuple([tps, fps, last_fps, last_tps]).execute( session=session, **(run_kwargs or dict()) ) last_fps, last_tps = mt.ExecutableTuple([last_fps, last_tps]).fetch(session=session) if last_fps <= 0: warnings.warn( "No negative samples in y_true, " "false positive value should be meaningless", UndefinedMetricWarning, ) fpr = mt.repeat(mt.nan, fps.shape) else: fpr = fps / last_fps if last_tps <= 0: warnings.warn( "No positive samples in y_true, " "true positive value should be meaningless", UndefinedMetricWarning, ) tpr = mt.repeat(mt.nan, tps.shape) else: tpr = tps / last_tps ret = mt.ExecutableTuple([fpr, tpr, thresholds]).execute( session=session, **(run_kwargs or dict()) ) return ret