在 Small/Partitioned 个数据帧上使用延迟的 Dask
Using Dask Delayed on Small/Partitioned Dataframes
我正在处理时间序列数据,其格式为每一行都是 ID/time/data 的单个实例。这意味着每个 ID 的行不是 1 到 1 对应的。每个 ID 在时间上都有很多行。
我正在尝试使用 dask delayed 在整个 ID 序列上使用 运行 函数(有意义的是该操作应该能够同时在每个单独的 ID 上 运行因为它们互不影响)。为此,我首先遍历每个 ID 标签,pulling/locating 来自该 ID 的所有数据(pandas 中有 .loc,因此它是一个单独的 "mini" df),然后延迟对迷你 df 的函数调用,添加一个包含延迟值的列并将其添加到所有迷你 df 的列表中。在 for 循环结束时,我想立即在所有 mini-dfs 上调用 dask.compute() 但由于某种原因,mini df 的值仍然延迟。下面我将 post 一些关于我刚刚试图解释的内容的伪代码。
我觉得这可能不是解决此问题的最佳方法,但当时这是有意义的,我不明白哪里出了问题,因此非常感谢您的帮助。
这是我正在尝试做的事情:
list_of_mini_dfs = []
for id in big_df:
curr_df = big_df.loc[big_df['id'] == id]
curr_df['new value 1'] = dask.delayed(myfunc)(args1)
curr_df['new value 2'] = dask.delayed(myfunc)(args2) #same func as previous line
list_of_mini_dfs.append(curr_df)
list_of_mini_dfs = dask.delayed(list_of_mini_dfs).compute()
Concat all mini dfs into new big df.
正如您在代码中看到的那样,我必须进入我的 big/overall 数据框以提取每个 ID 的数据序列,因为它散布在各行中。我希望能够对该单个 ID 的数据调用延迟函数,然后 return 函数调用中的值进入 big/overall 数据帧。
目前这种方法不起作用,当我将所有迷你数据帧连接在一起时,我延迟的两个值仍然延迟,这让我认为这是由于我在df 并尝试计算数据帧列表。我只是看不出如何解决它。
希望这是相对清楚的,感谢您的帮助。
IIUC 您正在尝试使用 dask 进行某种 transform
。
import pandas as pd
import dask.dataframe as dd
import numpy as np
# generate big_df
dates = pd.date_range(start='2019-01-01',
end='2020-01-01')
l = len(dates)
out = []
for i in range(1000):
df = pd.DataFrame({"ID":[i]*l,
"date": dates,
"data0": np.random.randn(l),
"data1": np.random.randn(l)})
out.append(df)
big_df = pd.concat(out, ignore_index=True)\
.sample(frac=1)\
.reset_index(drop=True)
现在您想在 data0
和 data1
列上应用函数 fun
Pandas
out = big_df.groupby("ID")[["data0","data1"]]\
.apply(fun)\
.reset_index()
df_pd = pd.merge(big_df, out, how="left", on="ID" )
达斯克
df = dd.from_pandas(big_df, npartitions=4)
out = df.groupby("ID")[["data0","data1"]]\
.apply(fun, meta={'data0':'f8',
'data1':'f8'})\
.rename(columns={'data0': 'new_values0',
'data1': 'new_values1'})\
.compute() # Here you need to compute otherwise you'll get NaNs
df_dask = dd.merge(df, out,
how="left",
left_on=["ID"],
right_index=True)
dask 版本不一定比 pandas 版本快。特别是如果您的 df
适合 RAM。
我正在处理时间序列数据,其格式为每一行都是 ID/time/data 的单个实例。这意味着每个 ID 的行不是 1 到 1 对应的。每个 ID 在时间上都有很多行。
我正在尝试使用 dask delayed 在整个 ID 序列上使用 运行 函数(有意义的是该操作应该能够同时在每个单独的 ID 上 运行因为它们互不影响)。为此,我首先遍历每个 ID 标签,pulling/locating 来自该 ID 的所有数据(pandas 中有 .loc,因此它是一个单独的 "mini" df),然后延迟对迷你 df 的函数调用,添加一个包含延迟值的列并将其添加到所有迷你 df 的列表中。在 for 循环结束时,我想立即在所有 mini-dfs 上调用 dask.compute() 但由于某种原因,mini df 的值仍然延迟。下面我将 post 一些关于我刚刚试图解释的内容的伪代码。
我觉得这可能不是解决此问题的最佳方法,但当时这是有意义的,我不明白哪里出了问题,因此非常感谢您的帮助。
这是我正在尝试做的事情:
list_of_mini_dfs = []
for id in big_df:
curr_df = big_df.loc[big_df['id'] == id]
curr_df['new value 1'] = dask.delayed(myfunc)(args1)
curr_df['new value 2'] = dask.delayed(myfunc)(args2) #same func as previous line
list_of_mini_dfs.append(curr_df)
list_of_mini_dfs = dask.delayed(list_of_mini_dfs).compute()
Concat all mini dfs into new big df.
正如您在代码中看到的那样,我必须进入我的 big/overall 数据框以提取每个 ID 的数据序列,因为它散布在各行中。我希望能够对该单个 ID 的数据调用延迟函数,然后 return 函数调用中的值进入 big/overall 数据帧。
目前这种方法不起作用,当我将所有迷你数据帧连接在一起时,我延迟的两个值仍然延迟,这让我认为这是由于我在df 并尝试计算数据帧列表。我只是看不出如何解决它。
希望这是相对清楚的,感谢您的帮助。
IIUC 您正在尝试使用 dask 进行某种 transform
。
import pandas as pd
import dask.dataframe as dd
import numpy as np
# generate big_df
dates = pd.date_range(start='2019-01-01',
end='2020-01-01')
l = len(dates)
out = []
for i in range(1000):
df = pd.DataFrame({"ID":[i]*l,
"date": dates,
"data0": np.random.randn(l),
"data1": np.random.randn(l)})
out.append(df)
big_df = pd.concat(out, ignore_index=True)\
.sample(frac=1)\
.reset_index(drop=True)
现在您想在 data0
和 data1
fun
Pandas
out = big_df.groupby("ID")[["data0","data1"]]\
.apply(fun)\
.reset_index()
df_pd = pd.merge(big_df, out, how="left", on="ID" )
达斯克
df = dd.from_pandas(big_df, npartitions=4)
out = df.groupby("ID")[["data0","data1"]]\
.apply(fun, meta={'data0':'f8',
'data1':'f8'})\
.rename(columns={'data0': 'new_values0',
'data1': 'new_values1'})\
.compute() # Here you need to compute otherwise you'll get NaNs
df_dask = dd.merge(df, out,
how="left",
left_on=["ID"],
right_index=True)
dask 版本不一定比 pandas 版本快。特别是如果您的 df
适合 RAM。