Dask 和 fbprophet
Dask and fbprophet
我正在尝试同时使用 dask
和 fbprophet
库,但我要么做错了什么,要么遇到意外的性能问题。
import dask.dataframe as dd
import datetime as dt
import multiprocessing as mp
import numpy as np
import pandas as pd
pd.options.mode.chained_assignment = None
from fbprophet import Prophet
import time
ncpu = mp.cpu_count()
def parallel_pd(fun, vec, pool = ncpu-1):
with mp.Pool(pool) as p:
res = p.map(fun,vec)
return(res)
def forecast1dd(ts):
time.sleep(0.1)
return ts["y"].max()
def forecast1mp(key):
ts = df[df["key"]==key]
time.sleep(0.1)
return ts["y"].max()
def forecast2dd(ts):
future = pd.DataFrame({"ds":pd.date_range(start=ts["ds"].max()+ dt.timedelta(days=1),
periods=7, freq="D")})
key = ts.name
model = Prophet(yearly_seasonality=True)
model.fit(ts)
forecast = model.predict(future)
future["yhat"] = forecast["yhat"]
future["key"] = key
return future.as_matrix()
def forecast2mp(key):
ts = df[df["key"]==key]
future = pd.DataFrame({"ds":pd.date_range(start=ts["ds"].max()+ dt.timedelta(days=1),
periods=7, freq="D")})
model = Prophet(yearly_seasonality=True)
model.fit(ts)
forecast = model.predict(future)
future["yhat"] = forecast["yhat"]
future["key"] = key
return future.as_matrix()
一方面,我有一个自定义函数,运行时间约为 0.1 秒,因此 forecast1dd
和 forecast1mp
正在模拟我的函数和以下数据帧
N = 2*365
key_n = 5000
df = pd.concat([pd.DataFrame({"ds":pd.date_range(start="2015-01-01",periods=N, freq="D"),
"y":np.random.normal(100,20,N),
"key":np.repeat(str(k),N)}) for k in range(key_n)])
keys = df.key.unique()
df = df.sample(frac=1).reset_index(drop=True)
ddf = dd.from_pandas(df, npartitions=ncpu*2)
我得到(分别)
%%time
grp = ddf.groupby("key").apply(forecast1dd, meta=pd.Series(name="s"))
df1dd = grp.to_frame().compute()
CPU times: user 7.7 s, sys: 400 ms, total: 8.1 s
Wall time: 1min 8s
%%time
res = parallel_pd(forecast1mp,keys)
CPU times: user 820 ms, sys: 360 ms, total: 1.18 s
Wall time: 10min 36s
第一种情况下内核没有100%使用,但性能符合我的实际情况。使用线路分析器很容易检查,第二种情况下性能低下的罪魁祸首是ts = df[df["key"]==key]
,如果我们有更多的键,情况会变得更糟。
所以到目前为止我对 dask
很满意。但是每当我尝试使用 fbprophet
时,情况就会发生变化。在这里我使用较少的 keys
但不太可能以前的情况 dask
性能总是比 multiprocessing
.
差
N = 2*365
key_n = 200
df = pd.concat([pd.DataFrame({"ds":pd.date_range(start="2015-01-01",periods=N, freq="D"),
"y":np.random.normal(100,20,N),
"key":np.repeat(str(k),N)}) for k in range(key_n)])
keys = df.key.unique()
df = df.sample(frac=1).reset_index(drop=True)
ddf = dd.from_pandas(df, npartitions=ncpu*2)
%%time
grp = ddf.groupby("key").apply(forecast2dd,
meta=pd.Series(name="s")).to_frame().compute()
df2dd = pd.concat([pd.DataFrame(a) for a in grp.s.values])
CPU times: user 3min 42s, sys: 15 s, total: 3min 57s
Wall time: 3min 30s
%%time
res = parallel_pd(forecast2mp,keys)
df2mp = pd.concat([pd.DataFrame(a) for a in res])
CPU times: user 76 ms, sys: 160 ms, total: 236 ms
Wall time: 39.4 s
现在我的问题是:
- 如何使用 dask 提高先知的性能?
- 我应该怎么做才能让 dask 100% 使用内核?
我怀疑Prophet持有GIL,所以在计算ddf.groupby("key").apply(forecast2dd, meta=pd.Series(name="s")
时,只有一个线程可以运行 Python一次编码。使用 multiprocessing
可以避免这种情况,但代价是必须复制数据 ncpu
次。这应该与您的 parallel_pd
函数具有相似的 运行time。
%%time
with dask.set_options(get=dask.multiprocessing.get):
grp = ddf.groupby("key").apply(forecast2dd,
meta=pd.Series(name="s")).to_frame().compute()
df2dd = pd.concat([pd.DataFrame(a) for a in grp.s.values])
CPU times: user 2.47 s, sys: 251 ms, total: 2.72 s
Wall time: 1min 27s
您可以尝试询问 Prophet 开发人员是否需要持有 GIL。我怀疑问题出在 PyStan 中,并且当实际的 Stan 求解器 运行ning 时,他们可能不需要 GIL。有一个 Github 问题 here
旁注:由于您的示例 forecast1dd
是一个聚合,因此使用 dd.Aggregation
:
可以 运行 快得多
%%time
def forcast1dd_chunk(ts):
time.sleep(0.1)
return ts.max()
def forecast1dd_agg(ts):
return ts.max()
f1dd = dd.Aggregation("forecast1dd", forcast1dd_chunk, forecast1dd_agg)
grp = ddf.groupby("key")[['y']].agg(f1dd)
x = grp.compute()
CPU times: user 59.5 ms, sys: 5.13 ms, total: 64.7 ms
Wall time: 355 ms
尽管这不符合您的实际问题,这不是聚合。
我正在尝试同时使用 dask
和 fbprophet
库,但我要么做错了什么,要么遇到意外的性能问题。
import dask.dataframe as dd
import datetime as dt
import multiprocessing as mp
import numpy as np
import pandas as pd
pd.options.mode.chained_assignment = None
from fbprophet import Prophet
import time
ncpu = mp.cpu_count()
def parallel_pd(fun, vec, pool = ncpu-1):
with mp.Pool(pool) as p:
res = p.map(fun,vec)
return(res)
def forecast1dd(ts):
time.sleep(0.1)
return ts["y"].max()
def forecast1mp(key):
ts = df[df["key"]==key]
time.sleep(0.1)
return ts["y"].max()
def forecast2dd(ts):
future = pd.DataFrame({"ds":pd.date_range(start=ts["ds"].max()+ dt.timedelta(days=1),
periods=7, freq="D")})
key = ts.name
model = Prophet(yearly_seasonality=True)
model.fit(ts)
forecast = model.predict(future)
future["yhat"] = forecast["yhat"]
future["key"] = key
return future.as_matrix()
def forecast2mp(key):
ts = df[df["key"]==key]
future = pd.DataFrame({"ds":pd.date_range(start=ts["ds"].max()+ dt.timedelta(days=1),
periods=7, freq="D")})
model = Prophet(yearly_seasonality=True)
model.fit(ts)
forecast = model.predict(future)
future["yhat"] = forecast["yhat"]
future["key"] = key
return future.as_matrix()
一方面,我有一个自定义函数,运行时间约为 0.1 秒,因此 forecast1dd
和 forecast1mp
正在模拟我的函数和以下数据帧
N = 2*365
key_n = 5000
df = pd.concat([pd.DataFrame({"ds":pd.date_range(start="2015-01-01",periods=N, freq="D"),
"y":np.random.normal(100,20,N),
"key":np.repeat(str(k),N)}) for k in range(key_n)])
keys = df.key.unique()
df = df.sample(frac=1).reset_index(drop=True)
ddf = dd.from_pandas(df, npartitions=ncpu*2)
我得到(分别)
%%time
grp = ddf.groupby("key").apply(forecast1dd, meta=pd.Series(name="s"))
df1dd = grp.to_frame().compute()
CPU times: user 7.7 s, sys: 400 ms, total: 8.1 s
Wall time: 1min 8s
%%time
res = parallel_pd(forecast1mp,keys)
CPU times: user 820 ms, sys: 360 ms, total: 1.18 s
Wall time: 10min 36s
第一种情况下内核没有100%使用,但性能符合我的实际情况。使用线路分析器很容易检查,第二种情况下性能低下的罪魁祸首是ts = df[df["key"]==key]
,如果我们有更多的键,情况会变得更糟。
所以到目前为止我对 dask
很满意。但是每当我尝试使用 fbprophet
时,情况就会发生变化。在这里我使用较少的 keys
但不太可能以前的情况 dask
性能总是比 multiprocessing
.
N = 2*365
key_n = 200
df = pd.concat([pd.DataFrame({"ds":pd.date_range(start="2015-01-01",periods=N, freq="D"),
"y":np.random.normal(100,20,N),
"key":np.repeat(str(k),N)}) for k in range(key_n)])
keys = df.key.unique()
df = df.sample(frac=1).reset_index(drop=True)
ddf = dd.from_pandas(df, npartitions=ncpu*2)
%%time
grp = ddf.groupby("key").apply(forecast2dd,
meta=pd.Series(name="s")).to_frame().compute()
df2dd = pd.concat([pd.DataFrame(a) for a in grp.s.values])
CPU times: user 3min 42s, sys: 15 s, total: 3min 57s
Wall time: 3min 30s
%%time
res = parallel_pd(forecast2mp,keys)
df2mp = pd.concat([pd.DataFrame(a) for a in res])
CPU times: user 76 ms, sys: 160 ms, total: 236 ms
Wall time: 39.4 s
现在我的问题是:
- 如何使用 dask 提高先知的性能?
- 我应该怎么做才能让 dask 100% 使用内核?
我怀疑Prophet持有GIL,所以在计算ddf.groupby("key").apply(forecast2dd, meta=pd.Series(name="s")
时,只有一个线程可以运行 Python一次编码。使用 multiprocessing
可以避免这种情况,但代价是必须复制数据 ncpu
次。这应该与您的 parallel_pd
函数具有相似的 运行time。
%%time
with dask.set_options(get=dask.multiprocessing.get):
grp = ddf.groupby("key").apply(forecast2dd,
meta=pd.Series(name="s")).to_frame().compute()
df2dd = pd.concat([pd.DataFrame(a) for a in grp.s.values])
CPU times: user 2.47 s, sys: 251 ms, total: 2.72 s
Wall time: 1min 27s
您可以尝试询问 Prophet 开发人员是否需要持有 GIL。我怀疑问题出在 PyStan 中,并且当实际的 Stan 求解器 运行ning 时,他们可能不需要 GIL。有一个 Github 问题 here
旁注:由于您的示例 forecast1dd
是一个聚合,因此使用 dd.Aggregation
:
%%time
def forcast1dd_chunk(ts):
time.sleep(0.1)
return ts.max()
def forecast1dd_agg(ts):
return ts.max()
f1dd = dd.Aggregation("forecast1dd", forcast1dd_chunk, forecast1dd_agg)
grp = ddf.groupby("key")[['y']].agg(f1dd)
x = grp.compute()
CPU times: user 59.5 ms, sys: 5.13 ms, total: 64.7 ms
Wall time: 355 ms
尽管这不符合您的实际问题,这不是聚合。