# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import warnings
import numpy as np
from ... import tensor as mt
from ..utils.checks import assert_all_finite
from ..utils.multiclass import type_of_target
from ..utils.validation import check_consistent_length, column_or_1d
[docs]def auc(x, y, session=None, run_kwargs=None):
"""Compute Area Under the Curve (AUC) using the trapezoidal rule
This is a general function, given points on a curve. For computing the
area under the ROC-curve, see :func:`roc_auc_score`. For an alternative
way to summarize a precision-recall curve, see
:func:`average_precision_score`.
Parameters
----------
x : tensor, shape = [n]
x coordinates. These must be either monotonic increasing or monotonic
decreasing.
y : tensor, shape = [n]
y coordinates.
Returns
-------
auc : tensor, with float value
Examples
--------
>>> import mars.tensor as mt
>>> from mars.learn import metrics
>>> y = mt.array([1, 1, 2, 2])
>>> pred = mt.array([0.1, 0.4, 0.35, 0.8])
>>> fpr, tpr, thresholds = metrics.roc_curve(y, pred, pos_label=2)
>>> metrics.auc(fpr, tpr)
0.75
See also
--------
roc_auc_score : Compute the area under the ROC curve
average_precision_score : Compute average precision from prediction scores
precision_recall_curve :
Compute precision-recall pairs for different probability thresholds
"""
check_consistent_length(x, y)
x = column_or_1d(x)
y = column_or_1d(y)
if x.shape[0] < 2:
raise ValueError(
"At least 2 points are needed to compute"
f" area under curve, but x.shape = {x.shape}"
)
direction = 1
dx = mt.diff(x)
any_dx_lt_0 = mt.any(dx < 0)
all_dx_le_0 = mt.all(dx <= 0)
mt.ExecutableTuple([x, any_dx_lt_0, all_dx_le_0]).execute(
session=session, **(run_kwargs or dict())
)
if any_dx_lt_0.fetch(session=session):
if all_dx_le_0.fetch(session=session):
direction = -1
else:
x_data = x.fetch(session=session)
raise ValueError(f"x is neither increasing nor decreasing : {x_data}.")
area = direction * mt.trapz(y, x)
return area.execute(session=session, **(run_kwargs or dict()))
def _binary_clf_curve(
y_true, y_score, pos_label=None, sample_weight=None, session=None, run_kwargs=None
):
"""Calculate true and false positives per binary classification threshold.
Parameters
----------
y_true : tensor, shape = [n_samples]
True targets of binary classification
y_score : tensor, shape = [n_samples]
Estimated probabilities or decision function
pos_label : int or str, default=None
The label of the positive class
sample_weight : array-like of shape (n_samples,), default=None
Sample weights.
Returns
-------
fps : tensor, shape = [n_thresholds]
A count of false positives, at index i being the number of negative
samples assigned a score >= thresholds[i]. The total number of
negative samples is equal to fps[-1] (thus true negatives are given by
fps[-1] - fps).
tps : tensor, shape = [n_thresholds <= len(mt.unique(y_score))]
An increasing count of true positives, at index i being the number
of positive samples assigned a score >= thresholds[i]. The total
number of positive samples is equal to tps[-1] (thus false negatives
are given by tps[-1] - tps).
thresholds : tensor, shape = [n_thresholds]
Decreasing score values.
"""
y_type = type_of_target(y_true).to_numpy(session=session, **(run_kwargs or dict()))
if not (y_type == "binary" or (y_type == "multiclass" and pos_label is not None)):
raise ValueError(f"{y_type} format is not supported")
check_consistent_length(y_true, y_score, sample_weight)
y_true = column_or_1d(y_true)
y_score = column_or_1d(y_score)
y_true = assert_all_finite(y_true, check_only=False)
y_score = assert_all_finite(y_score, check_only=False)
if sample_weight is not None:
sample_weight = column_or_1d(sample_weight)
# ensure binary classification if pos_label is not specified
# classes.dtype.kind in ('O', 'U', 'S') is required to avoid
# triggering a FutureWarning by calling np.array_equal(a, b)
# when elements in the two arrays are not comparable.
classes = mt.unique(y_true, aggregate_size=1).to_numpy(
session=session, **(run_kwargs or dict())
)
if pos_label is None and (
classes.dtype.kind in ("O", "U", "S")
or not (
np.array_equal(classes, [0, 1])
or np.array_equal(classes, [-1, 1])
or np.array_equal(classes, [0])
or np.array_equal(classes, [-1])
or np.array_equal(classes, [1])
)
):
classes_repr = ", ".join(repr(c) for c in classes)
raise ValueError(
f"y_true takes value in {{{classes_repr}}} and "
"pos_label is not specified: either make y_true "
"take value in {{0, 1}} or {{-1, 1}} or "
"pass pos_label explicitly."
)
elif pos_label is None:
pos_label = 1.0
# make y_true a boolean vector
y_true = y_true == pos_label
# sort scores and corresponding truth values
desc_score_indices = mt.argsort(y_score, kind="mergesort")[::-1]
y_score = y_score[desc_score_indices]
y_true = y_true[desc_score_indices]
if sample_weight is not None:
weight = sample_weight[desc_score_indices]
else:
weight = 1.0
# y_score typically has many tied values. Here we extract
# the indices associated with the distinct values. We also
# concatenate a value for the end of the curve.
distinct_value_indices = mt.where(mt.diff(y_score))[0]
threshold_idxs = mt.r_[distinct_value_indices, y_true.size - 1]
# accumulate the true positives with decreasing threshold
tps = (y_true * weight).cumsum()[threshold_idxs]
if sample_weight is not None:
# express fps as a cumsum to ensure fps is increasing even in
# the presence of floating point errors
fps = ((1 - y_true) * weight).cumsum()[threshold_idxs]
else:
fps = 1 + threshold_idxs - tps
ret = mt.ExecutableTuple([fps, tps, y_score[threshold_idxs]])
return ret.execute(session=session, **(run_kwargs or dict()))
def _binary_roc_auc_score(
y_true, y_score, sample_weight=None, max_fpr=None, session=None, run_kwargs=None
):
"""Binary roc auc score."""
from numpy import interp
if len(mt.unique(y_true).execute()) != 2:
raise ValueError(
"Only one class present in y_true. ROC AUC score "
"is not defined in that case."
)
fpr, tpr, _ = roc_curve(
y_true,
y_score,
sample_weight=sample_weight,
session=session,
run_kwargs=run_kwargs,
)
fpr, tpr = mt.ExecutableTuple([fpr, tpr]).fetch(session=session)
if max_fpr is None or max_fpr == 1:
return auc(fpr, tpr, session=session, run_kwargs=run_kwargs).fetch(
session=session
)
if max_fpr <= 0 or max_fpr > 1:
raise ValueError(f"Expected max_fpr in range (0, 1], got: {max_fpr}")
# Add a single point at max_fpr by linear interpolation
stop = (
mt.searchsorted(fpr, max_fpr, "right")
.execute(session=session, **(run_kwargs or dict()))
.fetch(session=session)
)
x_interp = [fpr[stop - 1], fpr[stop]]
y_interp = [tpr[stop - 1], tpr[stop]]
tpr = list(tpr[:stop])
tpr.append(interp(max_fpr, x_interp, y_interp))
fpr = list(fpr[:stop])
fpr.append(max_fpr)
partial_auc = auc(fpr, tpr, session=session, run_kwargs=run_kwargs)
# McClish correction: standardize result to be 0.5 if non-discriminant
# and 1 if maximal
min_area = 0.5 * max_fpr ** 2
max_area = max_fpr
return 0.5 * (
1 + (partial_auc.fetch(session=session) - min_area) / (max_area - min_area)
)
[docs]def roc_curve(
y_true,
y_score,
pos_label=None,
sample_weight=None,
drop_intermediate=True,
session=None,
run_kwargs=None,
):
"""Compute Receiver operating characteristic (ROC)
Note: this implementation is restricted to the binary classification task.
Read more in the :ref:`User Guide <roc_metrics>`.
Parameters
----------
y_true : tensor, shape = [n_samples]
True binary labels. If labels are not either {-1, 1} or {0, 1}, then
pos_label should be explicitly given.
y_score : tensor, shape = [n_samples]
Target scores, can either be probability estimates of the positive
class, confidence values, or non-thresholded measure of decisions
(as returned by "decision_function" on some classifiers).
pos_label : int or str, default=None
The label of the positive class.
When ``pos_label=None``, if y_true is in {-1, 1} or {0, 1},
``pos_label`` is set to 1, otherwise an error will be raised.
sample_weight : array-like of shape (n_samples,), default=None
Sample weights.
drop_intermediate : boolean, optional (default=True)
Whether to drop some suboptimal thresholds which would not appear
on a plotted ROC curve. This is useful in order to create lighter
ROC curves.
.. versionadded:: 0.17
parameter *drop_intermediate*.
Returns
-------
fpr : tensor, shape = [>2]
Increasing false positive rates such that element i is the false
positive rate of predictions with score >= thresholds[i].
tpr : tensor, shape = [>2]
Increasing true positive rates such that element i is the true
positive rate of predictions with score >= thresholds[i].
thresholds : tensor, shape = [n_thresholds]
Decreasing thresholds on the decision function used to compute
fpr and tpr. `thresholds[0]` represents no instances being predicted
and is arbitrarily set to `max(y_score) + 1`.
See also
--------
roc_auc_score : Compute the area under the ROC curve
Notes
-----
Since the thresholds are sorted from low to high values, they
are reversed upon returning them to ensure they correspond to both ``fpr``
and ``tpr``, which are sorted in reversed order during their calculation.
References
----------
.. [1] `Wikipedia entry for the Receiver operating characteristic
<https://en.wikipedia.org/wiki/Receiver_operating_characteristic>`_
.. [2] Fawcett T. An introduction to ROC analysis[J]. Pattern Recognition
Letters, 2006, 27(8):861-874.
Examples
--------
>>> import mars.tensor as mt
>>> from mars.learn import metrics
>>> y = mt.array([1, 1, 2, 2])
>>> scores = mt.array([0.1, 0.4, 0.35, 0.8])
>>> fpr, tpr, thresholds = metrics.roc_curve(y, scores, pos_label=2)
>>> fpr
array([0. , 0. , 0.5, 0.5, 1. ])
>>> tpr
array([0. , 0.5, 0.5, 1. , 1. ])
>>> thresholds
array([1.8 , 0.8 , 0.4 , 0.35, 0.1 ])
"""
from sklearn.exceptions import UndefinedMetricWarning
fps, tps, thresholds = _binary_clf_curve(
y_true,
y_score,
pos_label=pos_label,
sample_weight=sample_weight,
session=session,
run_kwargs=run_kwargs,
)
# Attempt to drop thresholds corresponding to points in between and
# collinear with other points. These are always suboptimal and do not
# appear on a plotted ROC curve (and thus do not affect the AUC).
# Here mt.diff(_, 2) is used as a "second derivative" to tell if there
# is a corner at the point. Both fps and tps must be tested to handle
# thresholds with multiple data points (which are combined in
# _binary_clf_curve). This keeps all cases where the point should be kept,
# but does not drop more complicated cases like fps = [1, 3, 7],
# tps = [1, 2, 4]; there is no harm in keeping too many thresholds.
if drop_intermediate and len(fps) > 2:
optimal_idxs = mt.where(
mt.r_[True, mt.logical_or(mt.diff(fps, 2), mt.diff(tps, 2)), True]
)[0]
fps = fps[optimal_idxs]
tps = tps[optimal_idxs]
thresholds = thresholds[optimal_idxs]
# Add an extra threshold position
# to make sure that the curve starts at (0, 0)
tps = mt.r_[0, tps]
fps = mt.r_[0, fps]
thresholds = mt.r_[thresholds[0] + 1, thresholds]
last_fps = fps[-1]
last_tps = tps[-1]
mt.ExecutableTuple([tps, fps, last_fps, last_tps]).execute(
session=session, **(run_kwargs or dict())
)
last_fps, last_tps = mt.ExecutableTuple([last_fps, last_tps]).fetch(session=session)
if last_fps <= 0:
warnings.warn(
"No negative samples in y_true, "
"false positive value should be meaningless",
UndefinedMetricWarning,
)
fpr = mt.repeat(mt.nan, fps.shape)
else:
fpr = fps / last_fps
if last_tps <= 0:
warnings.warn(
"No positive samples in y_true, "
"true positive value should be meaningless",
UndefinedMetricWarning,
)
tpr = mt.repeat(mt.nan, tps.shape)
else:
tpr = tps / last_tps
ret = mt.ExecutableTuple([fpr, tpr, thresholds]).execute(
session=session, **(run_kwargs or dict())
)
return ret