Implement a Mars Operand#
Use read_csv
as an example to illustrate how to implement a Mars operand.
Define Operand Class#
All Mars operands inherit from the base class Operand
, it defines the
basic properties of operand, each module has it’s own child class, such as
DataFrameOperand
, TensorOperand
, etc. For tilebale operand, it also
needs to inherit from TileableOperandMixin
to implement tile
and execute
functions. So we firstly define operand class and set output types in init method,
the types could be DataFrame or Tensor which depends on the type of operand’s output data,
__call__
method is also needed for creating a Mars dataframe.
# NOTE: Use relative import if in Mars modules
from mars.dataframe.operands import DataFrameOperand, DataFrameOperandMixin
from mars.core import OutputType
from mars.serialization.serializables import StringField
class SimpleReadCSV(DataFrameOperand, DataFrameOperandMixin):
path = StringField('path')
def __init__(self, **kw):
super().__init__(**kw)
self._output_types=[OutputType.dataframe]
def __call__(self, index_value=None, columns_value=None,
dtypes=None, chunk_bytes=None):
# np.nan means its size is unknown on axis 0
shape = (np.nan, len(dtypes))
return self.new_dataframe(
None,
shape,
dtypes=dtypes,
index_value=index_value,
columns_value=columns_value,
chunk_bytes=chunk_bytes,
)
For the SimpleReadCSV
operand, the property path
means the path of csv file,
we use a StringField
to indicate the property’s type which is useful for serialization.
If the type is uncertain, AnyField
will work.
Implement Tile Method#
Tile method is the next goal, this method will split the computing task into
several sub tasks. Ideally, these tasks can be assigned on different executors
in parallel. In the specific case of read_csv
, each sub task read a block of bytes
from the file, so we need calculate the offset and length of each block in the
tile function. As we use the same class for both coarse-grained and fine-grained operand,
offset
, length
and other properties are added to record information for
fine-grained operand.
import os
import numpy as np
from mars.core import OutputType
from mars.dataframe.operands import DataFrameOperand, DataFrameOperandMixin
from mars.serialization.serializables import AnyField, StringField, Int64Field
from mars.utils import parse_readable_size
class SimpleReadCSV(DataFrameOperand, DataFrameOperandMixin):
path = StringField("path")
chunk_bytes = AnyFiled('chunk_bytes')
offset = Int64Field("offset")
length = Int64Field("length")
def __init__(self, **kw):
super().__init__(**kw)
self._output_types=[OutputType.dataframe]
@classmethod
def tile(cls, op: "SimpleReadCSV"):
# out is operand's output in coarse-grained graph
out = op.outputs[0]
file_path = op.path
file_size = os.path.getsize(file_path)
# split file into chunks
chunk_bytes = int(parse_readable_size(op.chunk_bytes)[0])
offset = 0
index_num = 0
out_chunks = []
while offset < file_size:
chunk_op = op.copy().reset_key()
chunk_op.path = file_path
# offset and length for current chunk
chunk_op.offset = offset
chunk_op.length = min(chunk_bytes, file_size - offset)
# calculate chunk's meta, including shape, index_value, columns_value
# here we use np.nan to represent unknown shape
shape = (np.nan, len(out.dtypes))
# use `op.new_chunk` to create a dataframe chunk
new_chunk = chunk_op.new_chunk(
None,
shape=shape,
index=(index_num, 0),
index_value=out.index_value,
columns_value=out.columns_value,
dtypes=out.dtypes,
)
offset += chunk_bytes
index_num += 1
out_chunks.append(new_chunk)
# create a new tileable which holds `chunks` for generating fine-grained graph
new_op = op.copy()
# `nsplits` records the split info for each axis. For read_csv,
# the output dataframe is split into multiple parts on axis 0 and
# keep one chunk on axis 1, so the nsplits will be
# like ((np.nan, np.nan, ...), (out.shape[1],))
nsplits = ((np.nan,) * len(out_chunks), (out.shape[1],))
return new_op.new_dataframes(
None,
out.shape,
dtypes=out.dtypes,
index_value=out.index_value,
columns_value=out.columns_value,
chunks=out_chunks,
nsplits=nsplits,
)
Implement Execute Method#
When sub task is delivered to executor, Mars will call operand’s execute method to
perform calculations. When it comes to read_csv
, we need read the block from the file
according to the offset
and length
, however the offset
is a rough position as
we can’t read a csv file from the middle of a line, using readline
to find the starts
and ends at delimiter boundaries.
from io import BytesIO
import pandas as pd
from mars.dataframe.utils import build_empty_df
def _find_chunk_start_end(f, offset, size):
f.seek(offset)
if f.tell() == 0:
start = 0
else:
f.readline()
start = f.tell()
f.seek(offset + size)
f.readline()
end = f.tell()
return start, end
class SimpleReadCSV(DataFrameOperand, DataFrameOperandMixin):
@classmethod
def execute(cls, ctx: Union[dict, Context], op: "SimpleReadCSV"):
out = op.outputs[0]
with open(op.path, 'rb') as f:
start, end = _find_chunk_start_end(f, op.offset, op.length)
if end == start:
# the last chunk may be empty
data = build_empty_df(out.dtypes)
else:
f.seek(start)
if start == 0:
# The first chunk contains header, skip header rows
data = pd.read_csv(BytesIO(f.read(end - start)),
skiprows=1,
names=out.dtypes.index)
else:
data = pd.read_csv(BytesIO(f.read(end - start)),
names=out.dtypes.index)
ctx[out.key] = data
After reading the chunk data by pd.read_csv
, we store the results in ctx
.
SimpleReadCSV
only has one output here, for operand like SVD
that has multiple
outputs, we can store them separately using output’s keys.
Define User Interface#
Finally, we need define function read_csv
exposed to users. In this function, besides
creating a SimpleReadCSV
operand, a sample data is taken to infer some meta information
of Mars DataFrame, such as dtypes, columns, index, etc.
from mars.dataframe.utils import parse_index
def read_csv(file_path, chunk_bytes='16M'):
# use first 10 lines to infer
with open(file_path, 'rb') as f:
head_lines = b"".join([f.readline() for _ in range(10)])
mini_df = pd.read_csv(BytesIO(head_lines))
index_value = parse_index(mini_df.index)
columns_value = parse_index(mini_df.columns, store_data=True)
dtypes = mini_df.dtypes
op = SimpleReadCSV(path=file_path, chunk_bytes=chunk_bytes)
return op(
index_value=index_value,
columns_value=columns_value,
dtypes=dtypes,
chunk_bytes=chunk_bytes,
)
Write Tests#
Mars uses pytest for testing, we can add tests under the tests
subdirectory
of the specific module and follow the current examples of tests. Define a test
function and use the shared fixture setup
to run your tests under the test
environment.
def test_simple_read_csv_execution(setup):
with tempfile.TemporaryDirectory() as tempdir:
file_path = os.path.join(tempdir, "test.csv")
# write to a csv file
raw = pd.DataFrame({
'int': range(10),
'float': np.random.rand(10),
'str': [f'value_{i}' for i in range(10)]
}).to_csv(file_path, index=False)
mdf = read_csv(file_path).execute().fetch()
pd.testing.assert_frame_equal(raw, mdf)
When tests pass locally, we can submit a pull requests on GitHub, the test suite will run automatically on GitHub Actions and Azure Pipelines continuous integration services, if all checks have passed, it means the pull request is up to the quality of merging.
Documenting Your Code#
If the changes add APIs to Mars modules, we should document our code in docs
directory, it can be done following the regarding documentation.