# Copyright 1999-2020 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import pickle from collections import OrderedDict, defaultdict import numpy as np from ....serialize import ValueType, DictField, KeyField, ListField from ....operands import MergeDictOperand, OutputType from .... import opcodes as OperandDef from ....context import get_context, RunningMode from .start_tracker import StartTracker from .dmatrix import ToDMatrix def _on_serialize_evals(evals_val): if evals_val is None: return None return [list(x) for x in evals_val] class XGBTrain(MergeDictOperand): _op_type_ = OperandDef.XGBOOST_TRAIN _params = DictField('params', key_type=ValueType.string) _dtrain = KeyField('dtrain') _evals = ListField('evals', on_serialize=_on_serialize_evals) _kwargs = DictField('kwargs', key_type=ValueType.string) _tracker = KeyField('tracker') def __init__(self, params=None, dtrain=None, evals=None, kwargs=None, tracker=None, gpu=None, **kw): super().__init__(_params=params, _dtrain=dtrain, _evals=evals, _kwargs=kwargs, _tracker=tracker, _gpu=gpu, **kw) if self.output_types is None: self.output_types = [OutputType.object] @property def params(self): return self._params @property def dtrain(self): return self._dtrain @property def evals(self): return self._evals @property def kwargs(self): return self._kwargs @property def tracker(self): return self._tracker def _set_inputs(self, inputs): super()._set_inputs(inputs) self._dtrain = self._inputs[0] rest = self._inputs[1:] if self._tracker is not None: self._tracker = self._inputs[-1] rest = rest[:-1] if self._evals is not None: evals_dict = OrderedDict(self._evals) new_evals_dict = OrderedDict() for new_key, val in zip(rest, evals_dict.values()): new_evals_dict[new_key] = val self._evals = list(new_evals_dict.items()) def __call__(self): inputs = [self._dtrain] if self._evals is not None: inputs.extend(e[0] for e in self._evals) return self.new_tileable(inputs) @staticmethod def _get_dmatrix_chunks_workers(ctx, dmatrix): # dmatrix_chunk.inputs is concat, and concat's input is the coallocated chunks metas = ctx.get_chunk_metas([c.inputs[0].inputs[0].key for c in dmatrix.chunks]) return [m.workers[0] for m in metas] @staticmethod def _get_dmatrix_worker_to_chunk(dmatrix, workers, ctx): worker_to_chunk = dict() expect_workers = set(workers) workers = XGBTrain._get_dmatrix_chunks_workers(ctx, dmatrix) for w, c in zip(workers, dmatrix.chunks): if w in expect_workers: worker_to_chunk[w] = c return worker_to_chunk @classmethod def tile(cls, op): ctx = get_context() if ctx.running_mode != RunningMode.distributed: assert all(len(inp.chunks) == 1 for inp in op.inputs) chunk_op = op.copy().reset_key() out_chunk = chunk_op.new_chunk([inp.chunks[0] for inp in op.inputs], shape=(1,), index=(0,)) new_op = op.copy() return new_op.new_tileables(op.inputs, chunks=[out_chunk], nsplits=((1,),)) else: inp = op.inputs[0] in_chunks = inp.chunks workers = cls._get_dmatrix_chunks_workers(ctx, inp) tracker_chunk = StartTracker(n_workers=len(in_chunks)).new_chunk(in_chunks, shape=()) out_chunks = [] worker_to_evals = defaultdict(list) if op.evals is not None: for dm, ev in op.evals: worker_to_chunk = cls._get_dmatrix_worker_to_chunk(dm, workers, ctx) for worker, chunk in worker_to_chunk.items(): worker_to_evals[worker].append((chunk, ev)) for in_chunk, worker in zip(in_chunks, workers): chunk_op = op.copy().reset_key() chunk_op._expect_worker = worker chunk_op._tracker = tracker_chunk chunk_evals = list(worker_to_evals.get(worker, list())) chunk_op._evals = chunk_evals input_chunks = [in_chunk] + [pair[0] for pair in chunk_evals] + [tracker_chunk] out_chunk = chunk_op.new_chunk(input_chunks, shape=(np.nan,), index=in_chunk.index[:1]) out_chunks.append(out_chunk) new_op = op.copy() return new_op.new_tileables(op.inputs, chunks=out_chunks, nsplits=((np.nan for _ in out_chunks),)) @classmethod def execute(cls, ctx, op): if op.merge: return super().execute(ctx, op) from xgboost import train, rabit dtrain = ToDMatrix.get_xgb_dmatrix(ctx[op.dtrain.key]) evals = tuple() if op.evals is not None: eval_dmatrices = [ToDMatrix.get_xgb_dmatrix(ctx[t[0].key]) for t in op.evals] evals = tuple((m, ev[1]) for m, ev in zip(eval_dmatrices, op.evals)) params = op.params params['nthread'] = ctx.get_ncores() or -1 if op.tracker is None: # non distributed local_history = dict() kwargs = dict() if op.kwargs is None else op.kwargs bst = train(params, dtrain, evals=evals, evals_result=local_history, **kwargs) ctx[op.outputs[0].key] = {'booster': pickle.dumps(bst), 'history': local_history} else: # distributed rabit_args = ctx[op.tracker.key] rabit.init(rabit_args) try: local_history = dict() bst = train(params, dtrain, evals=evals, evals_result=local_history, **op.kwargs) ret = {'booster': pickle.dumps(bst), 'history': local_history} if rabit.get_rank() != 0: ret = {} ctx[op.outputs[0].key] = ret finally: rabit.finalize() [docs]def train(params, dtrain, evals=(), **kwargs): """ Train XGBoost model in Mars manner. Parameters ---------- Parameters are the same as `xgboost.train`. Returns ------- results: Booster """ evals_result = kwargs.pop('evals_result', dict()) session = kwargs.pop('session', None) run_kwargs = kwargs.pop('run_kwargs', dict()) op = XGBTrain(params=params, dtrain=dtrain, evals=evals, kwargs=kwargs) t = op() ret = t.execute(session=session, **run_kwargs).fetch(session=session) evals_result.update(ret['history']) bst = pickle.loads(ret['booster']) num_class = params.get('num_class') if num_class: bst.set_attr(num_class=str(num_class)) return bst