Integrate with PyTorch#

This introduction will give a brief tour about how to integrate PyTorch in Mars.

Installation#

If you are trying to use Mars on a single machine, e.g. on your laptop, make sure PyTorch is installed.

You can install PyTorch via pip:

pip3 install torch torchvision torchaudio

Visit installation guide for PyTorch for more information.

On the other hand, if you are about to use Mars on a cluster, maker sure PyTorch is installed on each worker.

Prepare data#

The dataset here we used is ionosphere dataset, click link to download data.

Prepare PyTorch script#

Now we create a Python file called torch_demo.py which contains the logic of PyTorch.

import os

import mars.dataframe as md
import torch
import torch.nn as nn
import torch.distributed as dist
import torch.optim as optim
import torch.utils.data
from sklearn.preprocessing import LabelEncoder


def prepare_data():
    df = md.read_csv('ionosphere.data', header=None)

    # split into input and output columns
    X = df.iloc[:, :-1].to_tensor().astype('float32')
    y = df.iloc[:, -1].to_tensor()

    # convert Mars tensor to numpy ndarray
    X, y = X.to_numpy(), y.to_numpy()

    # encode string to integer
    y = LabelEncoder().fit_transform(y)

    return X, y


def get_model():
    return nn.Sequential(
        nn.Linear(34, 10),
        nn.ReLU(),
        nn.Linear(10, 8),
        nn.ReLU(),
        nn.Linear(8, 1),
        nn.Sigmoid(),
    )


def train():
    dist.init_process_group(backend="gloo")
    torch.manual_seed(42)

    data, labels= prepare_data()
    data = torch.from_numpy(data)
    labels = torch.from_numpy(labels)
    train_dataset = torch.utils.data.TensorDataset(data, labels.float())
    train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                            batch_size=32,
                                            shuffle=False,
                                            sampler=train_sampler)


    model = nn.parallel.DistributedDataParallel(get_model())
    optimizer = optim.Adam(model.parameters(),
                        lr=0.001)
    criterion = nn.BCELoss()

    for epoch in range(150):  # 150 epochs
        running_loss = 0.0
        for _, (batch_data, batch_labels) in enumerate(train_loader):
            outputs = model(batch_data)
            loss = criterion(outputs.squeeze(), batch_labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
        print(f"epoch {epoch}, running_loss is {running_loss}")


if __name__ == "__main__":
    train()

Mars libraries including DataFrame and so forth could be used directly to process massive data and accelerate preprocess.

Run PyTorch script via Mars#

The PyTorch script can be submitted via run_pytorch_script() now.

In [1]: from mars.learn.contrib.pytorch import run_pytorch_script

In [2]: run_pytorch_script("torch_demo.py", n_workers=2)
task: <Task pending coro=<Event.wait() running at ./mars-dev/lib/python3.7/asyncio/locks.py:293> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f04c5027cd0>()]>>
...
epoch 148, running_loss is 0.27749747782945633
epoch 148, running_loss is 0.29025389067828655
epoch 149, running_loss is 0.2736152168363333
epoch 149, running_loss is 0.2884620577096939
Out[4]: Object <op=RunPyTorch, key=d5c40e502b77310ef359729692233d56>

Distributed training or inference#

Refer to Run on Clusters section for deployment, or Run on Kubernetes section for running Mars on Kubernetes.

As you can tell from torch_demo.py, Mars will set environment variable automatically. Thus you don’t need to worry about the distributed setting, what you need do is to write a proper distributed PyTorch script..

Once a cluster exists, you can either set the session as default, the training and prediction shown above will be submitted to the cluster, or you can specify session=*** explicitly as well.

# A cluster has been configured, and web UI is started on <web_ip>:<web_port>
import mars
# set the session as the default one
sess = mars.new_session('http://<web_ip>:<web_port>')

# submitted to cluster by default
run_pytorch_script('torch_demo.py', n_workers=2)

# Or, session could be specified as well
run_pytorch_script('torch_demo.py', n_workers=2, session=sess)

MarsDataset#

In order to use Mars to process data, we implemented a MarsDataset that can convert Mars object (mars.tensor.Tensor, mars.dataframe.DataFrame, mars.dataframe.Series) to torch.util.data.Dataset.

from mars.learn.contrib.pytorch import MarsDataset, RandomSampler

data = mt.random.rand(1000, 32, dtype='f4')
labels = mt.random.randint(0, 2, (1000, 10), dtype='f4')

train_dataset = MarsDataset(data, labels)
train_sampler = RandomSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                            batch_size=32,
                                            sampler=train_sampler)

Now, run_pytorch_script() allow pass data to script. So you can preprocess data via mars, then pass data to script.

import mars.dataframe as md
from sklearn.preprocessing import LabelEncoder


df = md.read_csv('ionosphere.data', header=None)
feature_data = df.iloc[:, :-1].astype('float32')
feature_data.execute()
labels = df.iloc[:, -1]
labels = LabelEncoder().fit_transform(labels.execute().fetch())
label = label.astype('float32')

run_pytorch_script(
    "torch_script.py", n_workers=2, data={'feature_data': feature_data, 'labels': labels},
    port=9945, session=sess)

torch_script.py

from mars.learn.contrib.pytorch import DistributedSampler
from mars.learn.contrib.pytorch import MarsDataset
import torch
import torch.nn as nn
import torch.distributed as dist
import torch.optim as optim
import torch.utils.data


def get_model():
    return nn.Sequential(
        nn.Linear(34, 10),
        nn.ReLU(),
        nn.Linear(10, 8),
        nn.ReLU(),
        nn.Linear(8, 1),
        nn.Sigmoid(),
    )


def train(feature_data, labels):

    dist.init_process_group(backend='gloo')
    torch.manual_seed(42)

    data = feature_data
    labels = labels

    train_dataset = MarsDataset(data, labels)
    train_sampler = DistributedSampler(train_dataset)
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                            batch_size=32,
                                            shuffle=False,
                                            sampler=train_sampler)

    model = nn.parallel.DistributedDataParallel(get_model())
    optimizer = optim.Adam(model.parameters(),
                      lr=0.001)
    criterion = nn.BCELoss()

    for epoch in range(150):
        # 150 epochs
        running_loss = 0.0
        for _, (batch_data, batch_labels) in enumerate(train_loader):
            outputs = model(batch_data)
            loss = criterion(outputs.squeeze(), batch_labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        print(f"epoch {epoch}, running_loss is {running_loss}")


if __name__ == "__main__":
    feature_data = globals()['feature_data']
    labels = globals()['labels']
    train(feature_data, labels)

result:

epoch 147, running_loss is 0.29225416854023933
epoch 147, running_loss is 0.28132784366607666
epoch 148, running_loss is 0.27749747782945633
epoch 148, running_loss is 0.29025389067828655
epoch 149, running_loss is 0.2736152168363333
epoch 149, running_loss is 0.2884620577096939
Out[7]: Object <op=RunPyTorch, key=dc3c7ab3a54a7289af15e8be5b334cf0>