Python 基于列值的 Dask 数据帧分离
Python Dask dataframe separation based on column value
我是 python dask 的新手(对 pandas 有点经验)。我有一个很大的 Dask Dataframe(约 10 到 2000 万行),我必须根据唯一的列值将其分开。
例如,如果我有以下包含 C1 到 Cn 列的 Dataframe(抱歉,不知道如何在 Whosebug 中创建正确的 table)并且我想为每个唯一值创建子集 Dataframes C2
列
Base Dataframe:
|Ind| C1 | C2 |....| Cn |
|-----------------------|
| 1 |val1| AE |....|time|
|-----------------------|
| 2 |val2| FB |....|time|
|-----------------------|
|...|....| .. |....| ...|
|-----------------------|
| n |valn| QK |....|time|
Subset Dataframes to be created:
Subset 1:
|Ind| C1 | C2 |....| Cn |
|-----------------------|
| 1 |val1| AE |....|time|
|-----------------------|
| 2 |val2| AE |....|time|
|-----------------------|
|...|....| .. |....| ...|
|-----------------------|
| n |valn| AE |....|time|
Subset 2
|Ind| C1 | C2 |....| Cn |
|-----------------------|
| 1 |val1| FB |....|time|
|-----------------------|
| 2 |val2| FB |....|time|
|-----------------------|
|...|....| .. |....| ...|
|-----------------------|
| n |valn| FB |....|time|
and so on.
我目前的方法是获取 C2 的所有唯一值,并为每个值迭代过滤基础数据帧。但这需要很长时间。我目前正在研究如何改进这个过程,但如果你们中的任何人能给我一些提示,我将不胜感激。
在我看来,您可以在 pandas
和 dask
.
中使用 groupby
获得相同的子集
import pandas as pd
import dask.dataframe as dd
import numpy as np
import string
N = 5
rndm2 = lambda :"".join(np.random.choice(list(string.ascii_lowercase), 2))
df_sample = pd.DataFrame({"C1":np.arange(N),
"C2":[rndm2() for i in range(N)],
"C3":np.random.randn(N)})
M = 2
df = pd.concat([df_sample for i in range(M)], ignore_index=True)
df["C4"] = np.random.randn(N*M)
这里我只是打印 print(list(df.groupby("C2"))[0][1])
来向您展示每个组中的内容:
C1 C2 C3 C4
3 3 bx 0.668654 -0.237081
8 3 bx 0.668654 0.619883
如果您需要对磁盘进行良好分区,您可以执行以下操作
ddf = dd.from_pandas(df, npartitions=4)
ddf.to_parquet("saved/", partition_on=["C2"])
# You can check that the parquet files
# are in separated folder as
! ls saved/ # If you are on Linux
'C2=iw' 'C2=jl' 'C2=qf' 'C2=wy' 'C2=yr' _common_metadata
现在,如果您想使用这些组执行一些计算,您可以应用您的函数 fun
和 map_partitions
并注意输出元数据。
df = dd.read_parquet("saved/")
out = df.map_partitions(lambda x: fun(x)).compute() # you should add your output meta
我是 python dask 的新手(对 pandas 有点经验)。我有一个很大的 Dask Dataframe(约 10 到 2000 万行),我必须根据唯一的列值将其分开。
例如,如果我有以下包含 C1 到 Cn 列的 Dataframe(抱歉,不知道如何在 Whosebug 中创建正确的 table)并且我想为每个唯一值创建子集 Dataframes C2
列Base Dataframe:
|Ind| C1 | C2 |....| Cn |
|-----------------------|
| 1 |val1| AE |....|time|
|-----------------------|
| 2 |val2| FB |....|time|
|-----------------------|
|...|....| .. |....| ...|
|-----------------------|
| n |valn| QK |....|time|
Subset Dataframes to be created:
Subset 1:
|Ind| C1 | C2 |....| Cn |
|-----------------------|
| 1 |val1| AE |....|time|
|-----------------------|
| 2 |val2| AE |....|time|
|-----------------------|
|...|....| .. |....| ...|
|-----------------------|
| n |valn| AE |....|time|
Subset 2
|Ind| C1 | C2 |....| Cn |
|-----------------------|
| 1 |val1| FB |....|time|
|-----------------------|
| 2 |val2| FB |....|time|
|-----------------------|
|...|....| .. |....| ...|
|-----------------------|
| n |valn| FB |....|time|
and so on.
我目前的方法是获取 C2 的所有唯一值,并为每个值迭代过滤基础数据帧。但这需要很长时间。我目前正在研究如何改进这个过程,但如果你们中的任何人能给我一些提示,我将不胜感激。
在我看来,您可以在 pandas
和 dask
.
groupby
获得相同的子集
import pandas as pd
import dask.dataframe as dd
import numpy as np
import string
N = 5
rndm2 = lambda :"".join(np.random.choice(list(string.ascii_lowercase), 2))
df_sample = pd.DataFrame({"C1":np.arange(N),
"C2":[rndm2() for i in range(N)],
"C3":np.random.randn(N)})
M = 2
df = pd.concat([df_sample for i in range(M)], ignore_index=True)
df["C4"] = np.random.randn(N*M)
这里我只是打印 print(list(df.groupby("C2"))[0][1])
来向您展示每个组中的内容:
C1 C2 C3 C4
3 3 bx 0.668654 -0.237081
8 3 bx 0.668654 0.619883
如果您需要对磁盘进行良好分区,您可以执行以下操作
ddf = dd.from_pandas(df, npartitions=4)
ddf.to_parquet("saved/", partition_on=["C2"])
# You can check that the parquet files
# are in separated folder as
! ls saved/ # If you are on Linux
'C2=iw' 'C2=jl' 'C2=qf' 'C2=wy' 'C2=yr' _common_metadata
现在,如果您想使用这些组执行一些计算,您可以应用您的函数 fun
和 map_partitions
并注意输出元数据。
df = dd.read_parquet("saved/")
out = df.map_partitions(lambda x: fun(x)).compute() # you should add your output meta