.. _integrate_pytorch:
*************************
Integrate with PyTorch
*************************
.. currentmodule:: mars.learn.contrib.pytorch
This introduction will give a brief tour about how to integrate `PyTorch
`_ in Mars.
Installation
------------
If you are trying to use Mars on a single machine, e.g. on your laptop, make
sure PyTorch is installed.
You can install PyTorch via pip:
.. code-block:: bash
pip3 install torch torchvision torchaudio
Visit `installation guide for PyTorch `_
for more information.
On the other hand, if you are about to use Mars on a cluster, maker sure
PyTorch is installed on each worker.
Prepare data
------------
The dataset here we used is `ionosphere dataset
`_, click
link to download data.
Prepare PyTorch script
-------------------------
Now we create a Python file called ``torch_demo.py`` which contains the logic of
PyTorch.
.. code-block:: python
import os
import mars.dataframe as md
import torch
import torch.nn as nn
import torch.distributed as dist
import torch.optim as optim
import torch.utils.data
from sklearn.preprocessing import LabelEncoder
def prepare_data():
df = md.read_csv('ionosphere.data', header=None)
# split into input and output columns
X = df.iloc[:, :-1].to_tensor().astype('float32')
y = df.iloc[:, -1].to_tensor()
# convert Mars tensor to numpy ndarray
X, y = X.to_numpy(), y.to_numpy()
# encode string to integer
y = LabelEncoder().fit_transform(y)
return X, y
def get_model():
return nn.Sequential(
nn.Linear(34, 10),
nn.ReLU(),
nn.Linear(10, 8),
nn.ReLU(),
nn.Linear(8, 1),
nn.Sigmoid(),
)
def train():
dist.init_process_group(backend="gloo")
torch.manual_seed(42)
data, labels= prepare_data()
data = torch.from_numpy(data)
labels = torch.from_numpy(labels)
train_dataset = torch.utils.data.TensorDataset(data, labels.float())
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
batch_size=32,
shuffle=False,
sampler=train_sampler)
model = nn.parallel.DistributedDataParallel(get_model())
optimizer = optim.Adam(model.parameters(),
lr=0.001)
criterion = nn.BCELoss()
for epoch in range(150): # 150 epochs
running_loss = 0.0
for _, (batch_data, batch_labels) in enumerate(train_loader):
outputs = model(batch_data)
loss = criterion(outputs.squeeze(), batch_labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
running_loss += loss.item()
print(f"epoch {epoch}, running_loss is {running_loss}")
if __name__ == "__main__":
train()
Mars libraries including DataFrame and so forth could be used directly to
process massive data and accelerate preprocess.
Run PyTorch script via Mars
------------------------------
The PyTorch script can be submitted via :meth:`run_pytorch_script` now.
.. code-block:: ipython
In [1]: from mars.learn.contrib.pytorch import run_pytorch_script
In [2]: run_pytorch_script("torch_demo.py", n_workers=2)
task: wait_for=()]>>
...
epoch 148, running_loss is 0.27749747782945633
epoch 148, running_loss is 0.29025389067828655
epoch 149, running_loss is 0.2736152168363333
epoch 149, running_loss is 0.2884620577096939
Out[4]: Object
Distributed training or inference
---------------------------------
Refer to :ref:`deploy` section for deployment, or :ref:`k8s` section for
running Mars on Kubernetes.
As you can tell from ``torch_demo.py``, Mars will set environment variable
automatically. Thus you don't need to worry about the distributed setting, what
you need do is to write a proper `distributed PyTorch script.
`_.
Once a cluster exists, you can either set the session as default, the training
and prediction shown above will be submitted to the cluster, or you can specify
``session=***`` explicitly as well.
.. code-block:: python
# A cluster has been configured, and web UI is started on :
import mars
# set the session as the default one
sess = mars.new_session('http://:')
# submitted to cluster by default
run_pytorch_script('torch_demo.py', n_workers=2)
# Or, session could be specified as well
run_pytorch_script('torch_demo.py', n_workers=2, session=sess)
MarsDataset
------------
In order to use Mars to process data, we implemented a :class:`MarsDataset` that can convert
Mars object (:class:`mars.tensor.Tensor`, :class:`mars.dataframe.DataFrame`,
:class:`mars.dataframe.Series`) to ``torch.util.data.Dataset``.
.. code-block:: python
from mars.learn.contrib.pytorch import MarsDataset, RandomSampler
data = mt.random.rand(1000, 32, dtype='f4')
labels = mt.random.randint(0, 2, (1000, 10), dtype='f4')
train_dataset = MarsDataset(data, labels)
train_sampler = RandomSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
batch_size=32,
sampler=train_sampler)
Now, :meth:`run_pytorch_script` allow pass data to script. So you can preprocess data
via mars, then pass data to script.
.. code-block:: python
import mars.dataframe as md
from sklearn.preprocessing import LabelEncoder
df = md.read_csv('ionosphere.data', header=None)
feature_data = df.iloc[:, :-1].astype('float32')
feature_data.execute()
labels = df.iloc[:, -1]
labels = LabelEncoder().fit_transform(labels.execute().fetch())
label = label.astype('float32')
run_pytorch_script(
"torch_script.py", n_workers=2, data={'feature_data': feature_data, 'labels': labels},
port=9945, session=sess)
``torch_script.py``
.. code-block:: python
from mars.learn.contrib.pytorch import DistributedSampler
from mars.learn.contrib.pytorch import MarsDataset
import torch
import torch.nn as nn
import torch.distributed as dist
import torch.optim as optim
import torch.utils.data
def get_model():
return nn.Sequential(
nn.Linear(34, 10),
nn.ReLU(),
nn.Linear(10, 8),
nn.ReLU(),
nn.Linear(8, 1),
nn.Sigmoid(),
)
def train(feature_data, labels):
dist.init_process_group(backend='gloo')
torch.manual_seed(42)
data = feature_data
labels = labels
train_dataset = MarsDataset(data, labels)
train_sampler = DistributedSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
batch_size=32,
shuffle=False,
sampler=train_sampler)
model = nn.parallel.DistributedDataParallel(get_model())
optimizer = optim.Adam(model.parameters(),
lr=0.001)
criterion = nn.BCELoss()
for epoch in range(150):
# 150 epochs
running_loss = 0.0
for _, (batch_data, batch_labels) in enumerate(train_loader):
outputs = model(batch_data)
loss = criterion(outputs.squeeze(), batch_labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
running_loss += loss.item()
print(f"epoch {epoch}, running_loss is {running_loss}")
if __name__ == "__main__":
feature_data = globals()['feature_data']
labels = globals()['labels']
train(feature_data, labels)
result:
.. code-block:: ipython
epoch 147, running_loss is 0.29225416854023933
epoch 147, running_loss is 0.28132784366607666
epoch 148, running_loss is 0.27749747782945633
epoch 148, running_loss is 0.29025389067828655
epoch 149, running_loss is 0.2736152168363333
epoch 149, running_loss is 0.2884620577096939
Out[7]: Object