Mars 是基于张量的,用于进行大规模数据计算的统一计算框架。
文档
Mars tensor - 提供类似 Numpy API 的接口。
Numpy
Mars Tensor
import numpy as np N = 200_000_000 a = np.random.uniform(-1, 1, size=(N, 2)) print((np.linalg.norm(a, axis=1) < 1) .sum() * 4 / N)
import mars.tensor as mt N = 200_000_000 a = mt.random.uniform(-1, 1, size=(N, 2)) print(((mt.linalg.norm(a, axis=1) < 1) .sum() * 4 / N).execute())
3.14151712 CPU times: user 12.5 s, sys: 7.16 s, total: 19.7 s Wall time: 21.8 s
3.14161908 CPU times: user 17.5 s, sys: 3.56 s, total: 21.1 s Wall time: 5.59 s
Mars 能利用多核,即使在你的笔记本上;当分布式运行时,速度会更快。
Mars DataFrame - 提供类似 pandas 的接口
Pandas
Mars DataFrame
import numpy as np import pandas as pd df = pd.DataFrame( np.random.rand(100000000, 4), columns=list('abcd')) print(df.sum())
import mars.tensor as mt import mars.dataframe as md df = md.DataFrame( mt.random.rand(100000000, 4), columns=list('abcd')) print(df.sum().execute())
CPU times: user 10.9 s, sys: 2.69 s, total: 13.6 s Wall time: 11 s
CPU times: user 16.5 s, sys: 3.52 s, total: 20 s Wall time: 3.6 s
Mars learn - 提供类似 scikit-learn 的接口
Scikit-learn
Mars learn
from sklearn.datasets import make_blobs from sklearn.decomposition import PCA X, y = make_blobs( n_samples=100000000, n_features=3, centers=[[3, 3, 3], [0, 0, 0], [1, 1, 1], [2, 2, 2]], cluster_std=[0.2, 0.1, 0.2, 0.2], random_state=9) pca = PCA(n_components=3) pca.fit(X) print(pca.explained_variance_ratio_) print(pca.explained_variance_)
from mars.learn.datasets import make_blobs from mars.learn.decomposition import PCA X, y = make_blobs( n_samples=100000000, n_features=3, centers=[[3, 3, 3], [0, 0, 0], [1, 1, 1], [2, 2, 2]], cluster_std=[0.2, 0.1, 0.2, 0.2], random_state=9) pca = PCA(n_components=3) pca.fit(X) print(pca.explained_variance_ratio_) print(pca.explained_variance_)
Mars remote 允许用户并行执行函数。
普通函数调用
Mars remote
import numpy as np def calc_chunk(n, i): rs = np.random.RandomState(i) a = rs.uniform(-1, 1, size=(n, 2)) d = np.linalg.norm(a, axis=1) return (d < 1).sum() def calc_pi(fs, N): return sum(fs) * 4 / N N = 200_000_000 n = 10_000_000 fs = [calc_chunk(n, i) for i in range(N // n)] pi = calc_pi(fs, N) print(pi)
import numpy as np import mars.remote as mr def calc_chunk(n, i): rs = np.random.RandomState(i) a = rs.uniform(-1, 1, size=(n, 2)) d = np.linalg.norm(a, axis=1) return (d < 1).sum() def calc_pi(fs, N): return sum(fs) * 4 / N N = 200_000_000 n = 10_000_000 fs = [mr.spawn(calc_chunk, args=(n, i)) for i in range(N // n)] pi = mr.spawn(calc_pi, args=(fs, N)) print(pi.execute().fetch())
3.1416312 CPU times: user 32.2 s, sys: 4.86 s, total: 37.1 s Wall time: 12.4 s
3.1416312 CPU times: user 16.9 s, sys: 5.46 s, total: 22.3 s Wall time: 4.83 s
Mars 可以在单台机器上运行,也可以扩展到上百台机器组成的集群中运行,且在两种环境下可使用相同的代码。因此,Mars 可以方便地从单台机器迁移到集群,以适应数据量的增长。
Mars 能以若干种方式运行:
本地基于线程的调度
本地基于多进程的调度
在集群中运行
在 Kubernetes 中部署