Implement a Mars Operand#

Use read_csv as an example to illustrate how to implement a Mars operand.

Define Operand Class#

All Mars operands inherit from the base class Operand, it defines the basic properties of operand, each module has it’s own child class, such as DataFrameOperand, TensorOperand, etc. For tilebale operand, it also needs to inherit from TileableOperandMixin to implement tile and execute functions. So we firstly define operand class and set output types in init method, the types could be DataFrame or Tensor which depends on the type of operand’s output data, __call__ method is also needed for creating a Mars dataframe.

# NOTE: Use relative import if in Mars modules
from mars.dataframe.operands import DataFrameOperand, DataFrameOperandMixin
from mars.core import OutputType
from mars.serialization.serializables import StringField


class SimpleReadCSV(DataFrameOperand, DataFrameOperandMixin):
    path = StringField('path')

    def __init__(self, **kw):
        super().__init__(**kw)
        self._output_types=[OutputType.dataframe]

    def __call__(self, index_value=None, columns_value=None,
                 dtypes=None, chunk_bytes=None):
        # np.nan means its size is unknown on axis 0
        shape = (np.nan, len(dtypes))
        return self.new_dataframe(
            None,
            shape,
            dtypes=dtypes,
            index_value=index_value,
            columns_value=columns_value,
            chunk_bytes=chunk_bytes,
        )

For the SimpleReadCSV operand, the property path means the path of csv file, we use a StringField to indicate the property’s type which is useful for serialization. If the type is uncertain, AnyField will work.

Implement Tile Method#

Tile method is the next goal, this method will split the computing task into several sub tasks. Ideally, these tasks can be assigned on different executors in parallel. In the specific case of read_csv, each sub task read a block of bytes from the file, so we need calculate the offset and length of each block in the tile function. As we use the same class for both coarse-grained and fine-grained operand, offset, length and other properties are added to record information for fine-grained operand.

import os

import numpy as np

from mars.core import OutputType
from mars.dataframe.operands import DataFrameOperand, DataFrameOperandMixin
from mars.serialization.serializables import AnyField, StringField, Int64Field
from mars.utils import parse_readable_size


class SimpleReadCSV(DataFrameOperand, DataFrameOperandMixin):
    path = StringField("path")
    chunk_bytes = AnyFiled('chunk_bytes')
    offset = Int64Field("offset")
    length = Int64Field("length")

    def __init__(self, **kw):
        super().__init__(**kw)
        self._output_types=[OutputType.dataframe]

    @classmethod
    def tile(cls, op: "SimpleReadCSV"):
        # out is operand's output in coarse-grained graph
        out = op.outputs[0]

        file_path = op.path
        file_size = os.path.getsize(file_path)

        # split file into chunks
        chunk_bytes = int(parse_readable_size(op.chunk_bytes)[0])
        offset = 0
        index_num = 0
        out_chunks = []
        while offset < file_size:
            chunk_op = op.copy().reset_key()
            chunk_op.path = file_path
            # offset and length for current chunk
            chunk_op.offset = offset
            chunk_op.length = min(chunk_bytes, file_size - offset)
            # calculate chunk's meta, including shape, index_value, columns_value
            # here we use np.nan to represent unknown shape
            shape = (np.nan, len(out.dtypes))
            # use `op.new_chunk` to create a dataframe chunk
            new_chunk = chunk_op.new_chunk(
                None,
                shape=shape,
                index=(index_num, 0),
                index_value=out.index_value,
                columns_value=out.columns_value,
                dtypes=out.dtypes,
            )
            offset += chunk_bytes
            index_num += 1
            out_chunks.append(new_chunk)

        # create a new tileable which holds `chunks` for generating fine-grained graph
        new_op = op.copy()
        # `nsplits` records the split info for each axis. For read_csv,
        # the output dataframe is split into multiple parts on axis 0 and
        # keep one chunk on axis 1, so the nsplits will be
        # like ((np.nan, np.nan, ...), (out.shape[1],))
        nsplits = ((np.nan,) * len(out_chunks), (out.shape[1],))
        return new_op.new_dataframes(
            None,
            out.shape,
            dtypes=out.dtypes,
            index_value=out.index_value,
            columns_value=out.columns_value,
            chunks=out_chunks,
            nsplits=nsplits,
        )

Implement Execute Method#

When sub task is delivered to executor, Mars will call operand’s execute method to perform calculations. When it comes to read_csv, we need read the block from the file according to the offset and length, however the offset is a rough position as we can’t read a csv file from the middle of a line, using readline to find the starts and ends at delimiter boundaries.

from io import BytesIO

import pandas as pd

from mars.dataframe.utils import build_empty_df


def _find_chunk_start_end(f, offset, size):
    f.seek(offset)
    if f.tell() == 0:
        start = 0
    else:
        f.readline()
        start = f.tell()
    f.seek(offset + size)
    f.readline()
    end = f.tell()
    return start, end


class SimpleReadCSV(DataFrameOperand, DataFrameOperandMixin):
    @classmethod
    def execute(cls, ctx: Union[dict, Context], op: "SimpleReadCSV"):
        out = op.outputs[0]
        with open(op.path, 'rb') as f:
            start, end = _find_chunk_start_end(f, op.offset, op.length)
            if end == start:
                # the last chunk may be empty
                data = build_empty_df(out.dtypes)
            else:
                f.seek(start)
                if start == 0:
                    # The first chunk contains header, skip header rows
                    data = pd.read_csv(BytesIO(f.read(end - start)),
                                       skiprows=1,
                                       names=out.dtypes.index)
                else:
                    data = pd.read_csv(BytesIO(f.read(end - start)),
                                       names=out.dtypes.index)

        ctx[out.key] = data

After reading the chunk data by pd.read_csv, we store the results in ctx. SimpleReadCSV only has one output here, for operand like SVD that has multiple outputs, we can store them separately using output’s keys.

Define User Interface#

Finally, we need define function read_csv exposed to users. In this function, besides creating a SimpleReadCSV operand, a sample data is taken to infer some meta information of Mars DataFrame, such as dtypes, columns, index, etc.

from mars.dataframe.utils import parse_index

def read_csv(file_path, chunk_bytes='16M'):
    # use first 10 lines to infer
    with open(file_path, 'rb') as f:
        head_lines = b"".join([f.readline() for _ in range(10)])

    mini_df = pd.read_csv(BytesIO(head_lines))
    index_value = parse_index(mini_df.index)
    columns_value = parse_index(mini_df.columns, store_data=True)
    dtypes = mini_df.dtypes
    op = SimpleReadCSV(path=file_path, chunk_bytes=chunk_bytes)
    return op(
        index_value=index_value,
        columns_value=columns_value,
        dtypes=dtypes,
        chunk_bytes=chunk_bytes,
    )

Write Tests#

Mars uses pytest for testing, we can add tests under the tests subdirectory of the specific module and follow the current examples of tests. Define a test function and use the shared fixture setup to run your tests under the test environment.

def test_simple_read_csv_execution(setup):
    with tempfile.TemporaryDirectory() as tempdir:
        file_path = os.path.join(tempdir, "test.csv")
        # write to a csv file
        raw = pd.DataFrame({
            'int': range(10),
            'float': np.random.rand(10),
            'str': [f'value_{i}' for i in range(10)]
        }).to_csv(file_path, index=False)
        mdf = read_csv(file_path).execute().fetch()
        pd.testing.assert_frame_equal(raw, mdf)

When tests pass locally, we can submit a pull requests on GitHub, the test suite will run automatically on GitHub Actions and Azure Pipelines continuous integration services, if all checks have passed, it means the pull request is up to the quality of merging.

Documenting Your Code#

If the changes add APIs to Mars modules, we should document our code in docs directory, it can be done following the regarding documentation.