Mars is a tensor-based unified framework for large-scale data computation.
documentation
Mars tensor provides a familiar interface like Numpy.
Numpy
Mars tensor
import numpy as np N = 200_000_000 a = np.random.uniform(-1, 1, size=(N, 2)) print((np.linalg.norm(a, axis=1) < 1) .sum() * 4 / N)
import mars.tensor as mt N = 200_000_000 a = mt.random.uniform(-1, 1, size=(N, 2)) print(((mt.linalg.norm(a, axis=1) < 1) .sum() * 4 / N).execute())
3.14151712 CPU times: user 12.5 s, sys: 7.16 s, total: 19.7 s Wall time: 21.8 s
3.14161908 CPU times: user 17.5 s, sys: 3.56 s, total: 21.1 s Wall time: 5.59 s
Mars can leverage multiple cores, even on a laptop, and could be even faster for a distributed setting.
Mars DataFrame provides a familiar interface like pandas.
Pandas
Mars DataFrame
import numpy as np import pandas as pd df = pd.DataFrame( np.random.rand(100000000, 4), columns=list('abcd')) print(df.sum())
import mars.tensor as mt import mars.dataframe as md df = md.DataFrame( mt.random.rand(100000000, 4), columns=list('abcd')) print(df.sum().execute())
CPU times: user 10.9 s, sys: 2.69 s, total: 13.6 s Wall time: 11 s
CPU times: user 16.5 s, sys: 3.52 s, total: 20 s Wall time: 3.6 s
Mars learn provides a familiar interface like scikit-learn.
Scikit-learn
Mars learn
from sklearn.datasets import make_blobs from sklearn.decomposition import PCA X, y = make_blobs( n_samples=100000000, n_features=3, centers=[[3, 3, 3], [0, 0, 0], [1, 1, 1], [2, 2, 2]], cluster_std=[0.2, 0.1, 0.2, 0.2], random_state=9) pca = PCA(n_components=3) pca.fit(X) print(pca.explained_variance_ratio_) print(pca.explained_variance_)
from mars.learn.datasets import make_blobs from mars.learn.decomposition import PCA X, y = make_blobs( n_samples=100000000, n_features=3, centers=[[3, 3, 3], [0, 0, 0], [1, 1, 1], [2, 2, 2]], cluster_std=[0.2, 0.1, 0.2, 0.2], random_state=9) pca = PCA(n_components=3) pca.fit(X) print(pca.explained_variance_ratio_) print(pca.explained_variance_)
Mars remote allows users to execute functions in parallel.
Vanilla function calls
Mars remote
import numpy as np def calc_chunk(n, i): rs = np.random.RandomState(i) a = rs.uniform(-1, 1, size=(n, 2)) d = np.linalg.norm(a, axis=1) return (d < 1).sum() def calc_pi(fs, N): return sum(fs) * 4 / N N = 200_000_000 n = 10_000_000 fs = [calc_chunk(n, i) for i in range(N // n)] pi = calc_pi(fs, N) print(pi)
import numpy as np import mars.remote as mr def calc_chunk(n, i): rs = np.random.RandomState(i) a = rs.uniform(-1, 1, size=(n, 2)) d = np.linalg.norm(a, axis=1) return (d < 1).sum() def calc_pi(fs, N): return sum(fs) * 4 / N N = 200_000_000 n = 10_000_000 fs = [mr.spawn(calc_chunk, args=(n, i)) for i in range(N // n)] pi = mr.spawn(calc_pi, args=(fs, N)) print(pi.execute().fetch())
3.1416312 CPU times: user 32.2 s, sys: 4.86 s, total: 37.1 s Wall time: 12.4 s
3.1416312 CPU times: user 16.9 s, sys: 5.46 s, total: 22.3 s Wall time: 4.83 s
Mars can scale in to a single machine, and scale out to a cluster with hundreds of machines. Both the local and distributed version share the same piece of code, it’s fairly simple to migrate from a single machine to a cluster due to the increase of data.
Mars can run in a few ways:
Local thread-based scheduling
Local process-basesd scheduling
Run on cluster
Run on Kubernetes