Source code for mars.tensor.random.core

# Copyright 1999-2020 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import itertools
from collections.abc import Iterable
from contextlib import contextmanager

import numpy as np

from ...config import options
from ...serialize import ValueType, TupleField, Int32Field
from ...utils import tokenize
from ..core import TENSOR_TYPE, TENSOR_CHUNK_TYPE
from ..utils import decide_chunk_sizes, gen_random_seeds, broadcast_shape
from ..array_utils import array_module, device
from ..operands import TensorOperand, TensorMapReduceOperand, TensorOperandMixin
from ..datasource import tensor as astensor
from ..base import broadcast_to


[docs]class RandomState(object):
[docs] def __init__(self, seed=None): self._random_state = np.random.RandomState(seed=seed)
def seed(self, seed=None): """ Seed the generator. This method is called when `RandomState` is initialized. It can be called again to re-seed the generator. For details, see `RandomState`. Parameters ---------- seed : int or 1-d array_like, optional Seed for `RandomState`. Must be convertible to 32 bit unsigned integers. See Also -------- RandomState """ self._random_state.seed(seed=seed) def to_numpy(self): return self._random_state @classmethod def from_numpy(cls, np_random_state): state = RandomState() state._random_state = np_random_state return state @classmethod def _handle_size(cls, size): if size is None: return size try: return tuple(int(s) for s in size) except TypeError: return size,
_random_state = RandomState() def handle_array(arg): if not isinstance(arg, TENSOR_TYPE): if not isinstance(arg, Iterable): return arg arg = np.asarray(arg) return arg[(0,) * max(1, arg.ndim)] elif hasattr(arg, 'op') and hasattr(arg.op, 'data'): return arg.op.data[(0,) * max(1, arg.ndim)] return np.empty((0,), dtype=arg.dtype) class TensorRandomOperandMixin(TensorOperandMixin): __slots__ = () @classmethod def tile(cls, op): tensor = op.outputs[0] chunk_size = tensor.extra_params.raw_chunk_size or options.chunk_size nsplits = decide_chunk_sizes(tensor.shape, chunk_size, tensor.dtype.itemsize) fields = getattr(op, '_input_fields_', []) to_one_chunk_fields = set(getattr(op, '_into_one_chunk_fields_', list())) new_inputs = [] changed = False for field in fields: t = getattr(op, field) if not isinstance(t, TENSOR_TYPE): continue if field not in to_one_chunk_fields: t_nsplits = nsplits else: t_nsplits = t.shape # into 1 chunk rechunked = t.rechunk(t_nsplits) if rechunked is not t: rechunked._inplace_tile() changed = True new_inputs.append(rechunked) else: new_inputs.append(t) if changed: op.inputs = new_inputs idxes = list(itertools.product(*[range(len(s)) for s in nsplits])) seeds = gen_random_seeds(len(idxes), op.state) out_chunks = [] for seed, idx, shape in zip(seeds, idxes, itertools.product(*nsplits)): inputs = [] for inp in op.inputs: if len(inp.chunks) == 1: inputs.append(inp.chunks[0]) else: inputs.append(inp.cix[idx]) try: s = len(tuple(op.size)) size = shape[:s] except TypeError: if op.size is None: size = None else: size = shape[:1] except AttributeError: size = shape chunk_op = op.copy().reset_key() chunk_op._seed = int(seed) chunk_op._state = None chunk_op._size = size out_chunk = chunk_op.new_chunk(inputs, shape=shape, index=idx, order=tensor.order) out_chunks.append(out_chunk) new_op = op.copy() return new_op.new_tensors(op.inputs, tensor.shape, order=tensor.order, chunks=out_chunks, nsplits=nsplits, **tensor.extra_params) @classmethod def execute(cls, ctx, op): xp = array_module(op.gpu) if xp is np: device_id = -1 else: device_id = op.device or 0 get_val = lambda x: ctx[x.key] if isinstance(x, TENSOR_CHUNK_TYPE) else x with device(device_id): rs = xp.random.RandomState(op.seed) method_name = getattr(cls, '_func_name') try: if method_name in ('rand', 'randn'): try: res = getattr(rs, method_name)(*op.size, dtype=op.dtype) except TypeError: res = getattr(rs, method_name)(*op.size) elif method_name == 'randint': try: res = rs.randint(get_val(op.low), get_val(op.high), size=op.size, dtype=op.dtype) except TypeError: res = rs.randint(get_val(op.low), get_val(op.high), size=op.size) else: try: res = getattr(rs, method_name)(*(get_val(getattr(op, arg)) for arg in op.args), dtype=op.dtype) except TypeError: res = getattr(rs, method_name)(*(get_val(getattr(op, arg)) for arg in op.args)) if hasattr(res, 'dtype') and res.dtype != op.dtype: res = res.astype(op.dtype, copy=False) if xp is not np: ctx[op.outputs[0].key] = xp.asarray(res) else: ctx[op.outputs[0].key] = res except AttributeError: if xp is not np: # cupy cannot generate data, fallback to numpy rs = np.random.RandomState(op.seed) if method_name in ('rand', 'randn'): res = getattr(rs, method_name)(*op.size) else: res = getattr(rs, method_name)(*(get_val(getattr(op, arg)) for arg in op.args)) if res.dtype != op.dtype: res = res.astype(op.dtype, copy=False) ctx[op.outputs[0].key] = xp.asarray(res) else: raise def _calc_shape(self, shapes): shapes = list(shapes) if getattr(self, '_size', None) is not None: shapes.append(getattr(self, '_size')) return broadcast_shape(*shapes) @classmethod def _handle_arg(cls, arg, chunk_size): if isinstance(arg, (list, np.ndarray)): arg = astensor(arg, chunk_size=chunk_size) return arg @contextmanager def _get_inputs_shape_by_given_fields(self, inputs, shape, raw_chunk_size=None, tensor=True): fields = getattr(self, '_input_fields_', []) to_one_chunk_fields = set(getattr(self, '_into_one_chunk_fields_', list())) field_to_obj = dict() to_broadcast_shapes = [] if fields: if getattr(self, fields[0], None) is None: # create from beginning for field, val in zip(fields, inputs): if field not in to_one_chunk_fields: if isinstance(val, list): val = np.asarray(val) if tensor: val = self._handle_arg(val, raw_chunk_size) if isinstance(val, TENSOR_TYPE + TENSOR_CHUNK_TYPE): field_to_obj[field] = val if field not in to_one_chunk_fields: to_broadcast_shapes.append(val.shape) setattr(self, field, val) else: inputs_iter = iter(inputs) for field in fields: if isinstance(getattr(self, field), TENSOR_TYPE + TENSOR_CHUNK_TYPE): field_to_obj[field] = next(inputs_iter) if tensor: if shape is None: shape = self._calc_shape(to_broadcast_shapes) for field, inp in field_to_obj.items(): if field not in to_one_chunk_fields: field_to_obj[field] = broadcast_to(inp, shape) yield [field_to_obj[f] for f in fields if f in field_to_obj], shape inputs_iter = iter(getattr(self, '_inputs')) for field in fields: if field in field_to_obj: setattr(self, field, next(inputs_iter)) @classmethod def _get_shape(cls, kws, kw): if kw.get('shape') is not None: return kw.get('shape') elif kws is not None and len(kws) > 0: return kws[0].get('shape') def _new_tileables(self, inputs, kws=None, **kw): raw_chunk_size = kw.get('chunk_size', None) shape = self._get_shape(kws, kw) with self._get_inputs_shape_by_given_fields(inputs, shape, raw_chunk_size, True) as (inputs, shape): kw['shape'] = shape return super()._new_tileables(inputs, kws=kws, **kw) def _new_chunks(self, inputs, kws=None, **kw): shape = self._get_shape(kws, kw) with self._get_inputs_shape_by_given_fields(inputs, shape, None, False) as (inputs, shape): kw['shape'] = shape return super()._new_chunks(inputs, kws=kws, **kw) def _on_serialize_random_state(rs): return rs.get_state() if rs is not None else None def _on_deserialize_random_state(tup): rs = np.random.RandomState() rs.set_state(tup) return rs def RandomStateField(name, **kwargs): kwargs.update(dict(on_serialize=_on_serialize_random_state, on_deserialize=_on_deserialize_random_state)) return TupleField(name, **kwargs) class TensorSeedOperandMixin(object): @property def state(self): return getattr(self, '_state', None) @property def seed(self): return getattr(self, '_seed', None) @property def args(self): return [slot for slot in self.__slots__ if slot not in set(TensorRandomOperand.__slots__)] def _update_key(self): self._key = tokenize(type(self).__name__, *tuple(getattr(self, k, None) for k in self._keys_)) return self class TensorRandomOperand(TensorSeedOperandMixin, TensorOperand): _state = RandomStateField('state') _seed = Int32Field('seed') class TensorRandomMapReduceOperand(TensorSeedOperandMixin, TensorMapReduceOperand): _state = RandomStateField('state') _seed = Int32Field('seed') class TensorDistribution(TensorRandomOperand): _size = TupleField('size', ValueType.int64) @property def size(self): return self._size @classmethod def execute(cls, ctx, op): xp = array_module(op.gpu) if xp is np: device_id = -1 else: device_id = op.device or 0 with device(device_id): rs = xp.random.RandomState(op.seed) args = [] for k in op.args: val = getattr(op, k, None) if isinstance(val, TENSOR_CHUNK_TYPE): args.append(ctx[val.key]) else: args.append(val) method_name = getattr(cls, '_func_name') try: res = getattr(rs, method_name)(*args) if xp is not np: ctx[op.outputs[0].key] = xp.asarray(res) else: ctx[op.outputs[0].key] = res except AttributeError: if xp is not np: # cupy cannot generate, fall back to numpy rs = np.random.RandomState(op.seed) res = getattr(rs, method_name)(*args) ctx[op.outputs[0].key] = xp.asarray(res) else: raise class TensorSimpleRandomData(TensorRandomOperand): _size = TupleField('size', ValueType.int64) @property def size(self): return self._size