实现 Mars 算子#

这篇文档里使用 read_csv 作为例子介绍如何实现一个 Mars 算子

定义算子类#

所有的 Mars 算子都继承自基类 Operand,基类里定义了算子的基本属性,在每个大模块里都实现了对应的子类,比如 DataFrameOperandTensorOperand 等。对于一个可以被 tile 的算子,它还需要继承 TileableOperandMixin 用以实现 tileexecute 等必要的方法。所以首先我们可以先定义好算子类和它的初始化方法,指定算子的输出类型,并且在 __call__ 函数里定义好创建 Mars DataFrame 的逻辑。

# NOTE: Use relative import if in Mars modules
from mars.dataframe.operands import DataFrameOperand, DataFrameOperandMixin
from mars.core import OutputType
from mars.serialization.serializables import StringField


class SimpleReadCSV(DataFrameOperand, DataFrameOperandMixin):
    path = StringField('path')

    def __init__(self, **kw):
        super().__init__(**kw)
        self._output_types=[OutputType.dataframe]

    def __call__(self, index_value=None, columns_value=None,
                 dtypes=None, chunk_bytes=None):
        # np.nan means its size is unknown on axis 0
        shape = (np.nan, len(dtypes))
        return self.new_dataframe(
            None,
            shape,
            dtypes=dtypes,
            index_value=index_value,
            columns_value=columns_value,
            chunk_bytes=chunk_bytes,
        )

对于 SimpleReadCSV 算子,类持有的 path 属性记录 csv 的文件地址,这里使用 StringField 表示改属性的类型是字符串,指定类型主要是为了方便序列化算子,如果某个属性的类型是不确定的,可以用 AnyField 表示。

实现 tile 方法#

接下来需要实现 Tile 方法, 这个函数里主要是将该算子的计算任务分解成多个子任务。理想情况下,这些子任务可以并行的分发到不同的执行器中执行。对于当前的 read_csv 计算任务,每个子任务需要读取文件的不同块,所以在 tile 函数里我们需要计算出每个子任务读取文件块的偏移值以及读取长度。由于粗粒度算子与细粒度算子都使用同一个算子类表示,所以需要在刚刚定义好的算子中添加 offsetlength 等属性记录细粒度算子需要的一些信息。

import os

import numpy as np

from mars.core import OutputType
from mars.dataframe.operands import DataFrameOperand, DataFrameOperandMixin
from mars.serialization.serializables import AnyField, StringField, Int64Field
from mars.utils import parse_readable_size


class SimpleReadCSV(DataFrameOperand, DataFrameOperandMixin):
    path = StringField("path")
    chunk_bytes = AnyFiled('chunk_bytes')
    offset = Int64Field("offset")
    length = Int64Field("length")

    def __init__(self, **kw):
        super().__init__(**kw)
        self._output_types=[OutputType.dataframe]

    @classmethod
    def tile(cls, op: "SimpleReadCSV"):
        # out is operand's output in coarse-grained graph
        out = op.outputs[0]

        file_path = op.path
        file_size = os.path.getsize(file_path)

        # split file into chunks
        chunk_bytes = int(parse_readable_size(op.chunk_bytes)[0])
        offset = 0
        index_num = 0
        out_chunks = []
        while offset < file_size:
            chunk_op = op.copy().reset_key()
            chunk_op.path = file_path
            # offset and length for current chunk
            chunk_op.offset = offset
            chunk_op.length = min(chunk_bytes, file_size - offset)
            # calculate chunk's meta, including shape, index_value, columns_value
            # here we use np.nan to represent unknown shape
            shape = (np.nan, len(out.dtypes))
            # use `op.new_chunk` to create a dataframe chunk
            new_chunk = chunk_op.new_chunk(
                None,
                shape=shape,
                index=(index_num, 0),
                index_value=out.index_value,
                columns_value=out.columns_value,
                dtypes=out.dtypes,
            )
            offset += chunk_bytes
            index_num += 1
            out_chunks.append(new_chunk)

        # create a new tileable which holds `chunks` for generating fine-grained graph
        new_op = op.copy()
        # `nsplits` records the split info for each axis. For read_csv,
        # the output dataframe is split into multiple parts on axis 0 and
        # keep one chunk on axis 1, so the nsplits will be
        # like ((np.nan, np.nan, ...), (out.shape[1],))
        nsplits = ((np.nan,) * len(out_chunks), (out.shape[1],))
        return new_op.new_dataframes(
            None,
            out.shape,
            dtypes=out.dtypes,
            index_value=out.index_value,
            columns_value=out.columns_value,
            chunks=out_chunks,
            nsplits=nsplits,
        )

实现 execute 方法#

当拆分好的子任务被分发到执行器时,Mars 会调用算子的 execute 方法来做计算,对于 read_csv 的子任务,在函数里需要根据 offsetlength 读取对应的数据块,但是这两个值只是一个粗略的值,因为 csv 文件不能从一行的中间读取,所以每次执行的时候需要计算出分隔符所在的起始位置。

from io import BytesIO

import pandas as pd

from mars.dataframe.utils import build_empty_df


def _find_chunk_start_end(f, offset, size):
    f.seek(offset)
    if f.tell() == 0:
        start = 0
    else:
        f.readline()
        start = f.tell()
    f.seek(offset + size)
    f.readline()
    end = f.tell()
    return start, end


class SimpleReadCSV(DataFrameOperand, DataFrameOperandMixin):
    @classmethod
    def execute(cls, ctx: Union[dict, Context], op: "SimpleReadCSV"):
        out = op.outputs[0]
        with open(op.path, 'rb') as f:
            start, end = _find_chunk_start_end(f, op.offset, op.length)
            if end == start:
                # the last chunk may be empty
                data = build_empty_df(out.dtypes)
            else:
                f.seek(start)
                if start == 0:
                    # The first chunk contains header, skip header rows
                    data = pd.read_csv(BytesIO(f.read(end - start)),
                                       skiprows=1,
                                       names=out.dtypes.index)
                else:
                    data = pd.read_csv(BytesIO(f.read(end - start)),
                                       names=out.dtypes.index)

        ctx[out.key] = data

当我们通过 pd.read_csv 读取当前数据块后,可以将读到的数据存储在 ctx 中,这里 SimpleReadCSV 只有一个输出,对于 SVD 这样有多个输出的算子,可以通过outputs 不同的 key 存储对应的输出数据。

定义用户接口#

最后,需要定义一个暴露给用户的函数接口 read_csv。在这个函数里,我们需要创建 SimpleReadCSV 算子,并且需要读取一小块采样数据,推断出输出的 DataFrame 的dtypes, columns, index 等元信息。

from mars.dataframe.utils import parse_index

def read_csv(file_path, chunk_bytes='16M'):
    # use first 10 lines to infer
    with open(file_path, 'rb') as f:
        head_lines = b"".join([f.readline() for _ in range(10)])

    mini_df = pd.read_csv(BytesIO(head_lines))
    index_value = parse_index(mini_df.index)
    columns_value = parse_index(mini_df.columns, store_data=True)
    dtypes = mini_df.dtypes
    op = SimpleReadCSV(path=file_path, chunk_bytes=chunk_bytes)
    return op(
        index_value=index_value,
        columns_value=columns_value,
        dtypes=dtypes,
        chunk_bytes=chunk_bytes,
    )

编写测试用例#

Mars 使用 pytest 运行测试用例,可以参考当前模块的单元测试,在 tests 目录下添加对应的用例。在测试函数里,需要使用共享的 fixture setup 来使得你的测试跑在测试环境中。

def test_simple_read_csv_execution(setup):
    with tempfile.TemporaryDirectory() as tempdir:
        file_path = os.path.join(tempdir, "test.csv")
        # write to a csv file
        raw = pd.DataFrame({
            'int': range(10),
            'float': np.random.rand(10),
            'str': [f'value_{i}' for i in range(10)]
        }).to_csv(file_path, index=False)
        mdf = read_csv(file_path).execute().fetch()
        pd.testing.assert_frame_equal(raw, mdf)

如果在本地测试通过,这时候我们可以在 GitHub 提交一个 pull requests,所有的测试会自动运行在 GitHub Actions 和 Azure Pipelines 这两个持续集成的平台上,如果所有的检测都通过了,意味着这个 pull requests 达到了合并的要求。

添加文档#

如果代码里增加了 API,那我们需要在 docs 目录下添加说明,可以参考相关 文档 完成。