和 PyTorch 集成#

这篇指引会介绍如何在 Mars 里集成 PyTorch

安装#

如果尝试在单机比如你的笔记本上使用 Mars,确保 PyTorch 已经安装。

通过 pip 安装 PyTorch:

pip3 install torch torchvision torchaudio

访问 PyTorch 安装指南 获取更多信息

另一方面,如果你打算在集群中使用 Mars,确保 PyTorch 在每一个worker上已安装。

准备数据#

这里我们使用 ionosphere 数据集,点击链接下载数据。

编写 PyTorch 脚本#

现在我们创建一个命名为 torch_demo.py 的 Python 文件,它包含了 PyTorch 的处理逻辑。

import os

import mars.dataframe as md
import torch
import torch.nn as nn
import torch.distributed as dist
import torch.optim as optim
import torch.utils.data
from sklearn.preprocessing import LabelEncoder


def prepare_data():
    df = md.read_csv('ionosphere.data', header=None)

    # split into input and output columns
    X = df.iloc[:, :-1].to_tensor().astype('float32')
    y = df.iloc[:, -1].to_tensor()

    # convert Mars tensor to numpy ndarray
    X, y = X.to_numpy(), y.to_numpy()

    # encode string to integer
    y = LabelEncoder().fit_transform(y)

    return X, y


def get_model():
    return nn.Sequential(
        nn.Linear(34, 10),
        nn.ReLU(),
        nn.Linear(10, 8),
        nn.ReLU(),
        nn.Linear(8, 1),
        nn.Sigmoid(),
    )


def train():
    dist.init_process_group(backend="gloo")
    torch.manual_seed(42)

    data, labels= prepare_data()
    data = torch.from_numpy(data)
    labels = torch.from_numpy(labels)
    train_dataset = torch.utils.data.TensorDataset(data, labels.float())
    train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                            batch_size=32,
                                            shuffle=False,
                                            sampler=train_sampler)


    model = nn.parallel.DistributedDataParallel(get_model())
    optimizer = optim.Adam(model.parameters(),
                        lr=0.001)
    criterion = nn.BCELoss()

    for epoch in range(150):  # 150 epochs
        running_loss = 0.0
        for _, (batch_data, batch_labels) in enumerate(train_loader):
            outputs = model(batch_data)
            loss = criterion(outputs.squeeze(), batch_labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
        print(f"epoch {epoch}, running_loss is {running_loss}")


if __name__ == "__main__":
    train()

Mars DataFrame 等模块可以直接在脚本里使用,以处理大规模数据和加速预处理。

通过 Mars 运行 PyTorch 脚本#

现在可以通过 run_pytorch_script() 提交 PyTorch 脚本。

In [1]: from mars.learn.contrib.pytorch import run_pytorch_script

In [2]: run_pytorch_script("torch_demo.py", n_workers=2)
task: <Task pending coro=<Event.wait() running at ./mars-dev/lib/python3.7/asyncio/locks.py:293> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f04c5027cd0>()]>>
...
epoch 148, running_loss is 0.27749747782945633
epoch 148, running_loss is 0.29025389067828655
epoch 149, running_loss is 0.2736152168363333
epoch 149, running_loss is 0.2884620577096939
Out[4]: Object <op=RunPyTorch, key=d5c40e502b77310ef359729692233d56>

分布式训练和推理#

部署参考 在集群中部署 部分, 在 Kubernetes 上运行参考 在 Kubernetes 中部署 部分。

你能从 torch_demo.py 中看出,Mars 会自动设置相关的环境变量。你不需要考虑分布式相关的设置,你需要做的是写一个正确的 分布式PyTorch脚本

一旦一个集群存在,你可以要么设置默认 session,训练和预测就会自动提交到集群,要么你可以通过 session=*** 显示指定运行的 session。

# A cluster has been configured, and web UI is started on <web_ip>:<web_port>
import mars
# set the session as the default one
sess = mars.new_session('http://<web_ip>:<web_port>')

# submitted to cluster by default
run_pytorch_script('torch_demo.py', n_workers=2)

# Or, session could be specified as well
run_pytorch_script('torch_demo.py', n_workers=2, session=sess)

MarsDataset#

为了更好地使用 Mars 处理数据,我们实现了一个 MarsDataset 对象,继承 torch.util.data.Dataset,能够接收Mars 类型的数据(mars.tensor.Tensormars.dataframe.DataFramemars.dataframe.Series)。

from mars.learn.contrib.pytorch import MarsDataset, RandomSampler

data = mt.random.rand(1000, 32, dtype='f4')
labels = mt.random.randint(0, 2, (1000, 10), dtype='f4')

train_dataset = MarsDataset(data, labels)
train_sampler = RandomSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                            batch_size=32,
                                            sampler=train_sampler)

现在,run_pytorch_script() 允许传送 Mars 类型的数据到脚本中。所以你可以先通过 Mars 处理数据,再将其传送到脚本中。

import mars.dataframe as md
from sklearn.preprocessing import LabelEncoder


df = md.read_csv('ionosphere.data', header=None)
feature_data = df.iloc[:, :-1].astype('float32')
feature_data.execute()
labels = df.iloc[:, -1]
labels = LabelEncoder().fit_transform(labels.execute().fetch())
label = label.astype('float32')

run_pytorch_script(
    "torch_script.py", n_workers=2, data={'feature_data': feature_data, 'labels': labels},
    port=9945, session=sess)

torch_script.py

from mars.learn.contrib.pytorch import DistributedSampler
from mars.learn.contrib.pytorch import MarsDataset
import torch
import torch.nn as nn
import torch.distributed as dist
import torch.optim as optim
import torch.utils.data


def get_model():
    return nn.Sequential(
        nn.Linear(34, 10),
        nn.ReLU(),
        nn.Linear(10, 8),
        nn.ReLU(),
        nn.Linear(8, 1),
        nn.Sigmoid(),
    )


def train(feature_data, labels):

    dist.init_process_group(backend='gloo')
    torch.manual_seed(42)

    data = feature_data
    labels = labels

    train_dataset = MarsDataset(data, labels)
    train_sampler = DistributedSampler(train_dataset)
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                            batch_size=32,
                                            shuffle=False,
                                            sampler=train_sampler)

    model = nn.parallel.DistributedDataParallel(get_model())
    optimizer = optim.Adam(model.parameters(),
                      lr=0.001)
    criterion = nn.BCELoss()

    for epoch in range(150):
        # 150 epochs
        running_loss = 0.0
        for _, (batch_data, batch_labels) in enumerate(train_loader):
            outputs = model(batch_data)
            loss = criterion(outputs.squeeze(), batch_labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        print(f"epoch {epoch}, running_loss is {running_loss}")


if __name__ == "__main__":
    feature_data = globals()['feature_data']
    labels = globals()['labels']
    train(feature_data, labels)

运行结果:

epoch 147, running_loss is 0.29225416854023933
epoch 147, running_loss is 0.28132784366607666
epoch 148, running_loss is 0.27749747782945633
epoch 148, running_loss is 0.29025389067828655
epoch 149, running_loss is 0.2736152168363333
epoch 149, running_loss is 0.2884620577096939
Out[7]: Object <op=RunPyTorch, key=dc3c7ab3a54a7289af15e8be5b334cf0>