实现 Mars 算子#
这篇文档里使用 read_csv
作为例子介绍如何实现一个 Mars 算子
定义算子类#
所有的 Mars 算子都继承自基类 Operand
,基类里定义了算子的基本属性,在每个大模块里都实现了对应的子类,比如 DataFrameOperand
、 TensorOperand
等。对于一个可以被 tile 的算子,它还需要继承 TileableOperandMixin
用以实现 tile
、execute
等必要的方法。所以首先我们可以先定义好算子类和它的初始化方法,指定算子的输出类型,并且在 __call__
函数里定义好创建 Mars DataFrame 的逻辑。
# NOTE: Use relative import if in Mars modules
from mars.dataframe.operands import DataFrameOperand, DataFrameOperandMixin
from mars.core import OutputType
from mars.serialization.serializables import StringField
class SimpleReadCSV(DataFrameOperand, DataFrameOperandMixin):
path = StringField('path')
def __init__(self, **kw):
super().__init__(**kw)
self._output_types=[OutputType.dataframe]
def __call__(self, index_value=None, columns_value=None,
dtypes=None, chunk_bytes=None):
# np.nan means its size is unknown on axis 0
shape = (np.nan, len(dtypes))
return self.new_dataframe(
None,
shape,
dtypes=dtypes,
index_value=index_value,
columns_value=columns_value,
chunk_bytes=chunk_bytes,
)
对于 SimpleReadCSV
算子,类持有的 path
属性记录 csv 的文件地址,这里使用 StringField
表示改属性的类型是字符串,指定类型主要是为了方便序列化算子,如果某个属性的类型是不确定的,可以用 AnyField
表示。
实现 tile 方法#
接下来需要实现 Tile 方法, 这个函数里主要是将该算子的计算任务分解成多个子任务。理想情况下,这些子任务可以并行的分发到不同的执行器中执行。对于当前的 read_csv
计算任务,每个子任务需要读取文件的不同块,所以在 tile 函数里我们需要计算出每个子任务读取文件块的偏移值以及读取长度。由于粗粒度算子与细粒度算子都使用同一个算子类表示,所以需要在刚刚定义好的算子中添加 offset
、length
等属性记录细粒度算子需要的一些信息。
import os
import numpy as np
from mars.core import OutputType
from mars.dataframe.operands import DataFrameOperand, DataFrameOperandMixin
from mars.serialization.serializables import AnyField, StringField, Int64Field
from mars.utils import parse_readable_size
class SimpleReadCSV(DataFrameOperand, DataFrameOperandMixin):
path = StringField("path")
chunk_bytes = AnyFiled('chunk_bytes')
offset = Int64Field("offset")
length = Int64Field("length")
def __init__(self, **kw):
super().__init__(**kw)
self._output_types=[OutputType.dataframe]
@classmethod
def tile(cls, op: "SimpleReadCSV"):
# out is operand's output in coarse-grained graph
out = op.outputs[0]
file_path = op.path
file_size = os.path.getsize(file_path)
# split file into chunks
chunk_bytes = int(parse_readable_size(op.chunk_bytes)[0])
offset = 0
index_num = 0
out_chunks = []
while offset < file_size:
chunk_op = op.copy().reset_key()
chunk_op.path = file_path
# offset and length for current chunk
chunk_op.offset = offset
chunk_op.length = min(chunk_bytes, file_size - offset)
# calculate chunk's meta, including shape, index_value, columns_value
# here we use np.nan to represent unknown shape
shape = (np.nan, len(out.dtypes))
# use `op.new_chunk` to create a dataframe chunk
new_chunk = chunk_op.new_chunk(
None,
shape=shape,
index=(index_num, 0),
index_value=out.index_value,
columns_value=out.columns_value,
dtypes=out.dtypes,
)
offset += chunk_bytes
index_num += 1
out_chunks.append(new_chunk)
# create a new tileable which holds `chunks` for generating fine-grained graph
new_op = op.copy()
# `nsplits` records the split info for each axis. For read_csv,
# the output dataframe is split into multiple parts on axis 0 and
# keep one chunk on axis 1, so the nsplits will be
# like ((np.nan, np.nan, ...), (out.shape[1],))
nsplits = ((np.nan,) * len(out_chunks), (out.shape[1],))
return new_op.new_dataframes(
None,
out.shape,
dtypes=out.dtypes,
index_value=out.index_value,
columns_value=out.columns_value,
chunks=out_chunks,
nsplits=nsplits,
)
实现 execute 方法#
当拆分好的子任务被分发到执行器时,Mars 会调用算子的 execute
方法来做计算,对于 read_csv
的子任务,在函数里需要根据 offset
和 length
读取对应的数据块,但是这两个值只是一个粗略的值,因为 csv 文件不能从一行的中间读取,所以每次执行的时候需要计算出分隔符所在的起始位置。
from io import BytesIO
import pandas as pd
from mars.dataframe.utils import build_empty_df
def _find_chunk_start_end(f, offset, size):
f.seek(offset)
if f.tell() == 0:
start = 0
else:
f.readline()
start = f.tell()
f.seek(offset + size)
f.readline()
end = f.tell()
return start, end
class SimpleReadCSV(DataFrameOperand, DataFrameOperandMixin):
@classmethod
def execute(cls, ctx: Union[dict, Context], op: "SimpleReadCSV"):
out = op.outputs[0]
with open(op.path, 'rb') as f:
start, end = _find_chunk_start_end(f, op.offset, op.length)
if end == start:
# the last chunk may be empty
data = build_empty_df(out.dtypes)
else:
f.seek(start)
if start == 0:
# The first chunk contains header, skip header rows
data = pd.read_csv(BytesIO(f.read(end - start)),
skiprows=1,
names=out.dtypes.index)
else:
data = pd.read_csv(BytesIO(f.read(end - start)),
names=out.dtypes.index)
ctx[out.key] = data
当我们通过 pd.read_csv
读取当前数据块后,可以将读到的数据存储在 ctx
中,这里 SimpleReadCSV
只有一个输出,对于 SVD
这样有多个输出的算子,可以通过outputs 不同的 key 存储对应的输出数据。
定义用户接口#
最后,需要定义一个暴露给用户的函数接口 read_csv
。在这个函数里,我们需要创建 SimpleReadCSV
算子,并且需要读取一小块采样数据,推断出输出的 DataFrame 的dtypes, columns, index 等元信息。
from mars.dataframe.utils import parse_index
def read_csv(file_path, chunk_bytes='16M'):
# use first 10 lines to infer
with open(file_path, 'rb') as f:
head_lines = b"".join([f.readline() for _ in range(10)])
mini_df = pd.read_csv(BytesIO(head_lines))
index_value = parse_index(mini_df.index)
columns_value = parse_index(mini_df.columns, store_data=True)
dtypes = mini_df.dtypes
op = SimpleReadCSV(path=file_path, chunk_bytes=chunk_bytes)
return op(
index_value=index_value,
columns_value=columns_value,
dtypes=dtypes,
chunk_bytes=chunk_bytes,
)
编写测试用例#
Mars 使用 pytest 运行测试用例,可以参考当前模块的单元测试,在 tests
目录下添加对应的用例。在测试函数里,需要使用共享的 fixture setup
来使得你的测试跑在测试环境中。
def test_simple_read_csv_execution(setup):
with tempfile.TemporaryDirectory() as tempdir:
file_path = os.path.join(tempdir, "test.csv")
# write to a csv file
raw = pd.DataFrame({
'int': range(10),
'float': np.random.rand(10),
'str': [f'value_{i}' for i in range(10)]
}).to_csv(file_path, index=False)
mdf = read_csv(file_path).execute().fetch()
pd.testing.assert_frame_equal(raw, mdf)
如果在本地测试通过,这时候我们可以在 GitHub 提交一个 pull requests,所有的测试会自动运行在 GitHub Actions 和 Azure Pipelines 这两个持续集成的平台上,如果所有的检测都通过了,意味着这个 pull requests 达到了合并的要求。
添加文档#
如果代码里增加了 API,那我们需要在 docs
目录下添加说明,可以参考相关 文档 完成。