python cuDF groupby 应用于有序数据
python cuDF groupby apply with ordered data
我有一些有序数据,其中有事件层次结构。每一列都是一个事件的唯一 ID,与层次结构中位于其上方的事件相关。类似于每一天的数字在一个月中是唯一的,每个月的数字在一年中是唯一的。我想让最低级别在最高级别内是唯一的,比如通过从 1 到 365 编号使一年中的每一天都是唯一的。我的用例并不特定于日、月和年。
之前:
| ID | EVENT_1 | EVENT_2 | EVENT_3 |
| -- | ------- | ------- | ------- |
| 1 | 1 | 1 | 1 |
| 1 | 1 | 1 | 2 |
| 1 | 1 | 1 | 3 |
| 1 | 1 | 2 | 1 |
| 1 | 1 | 2 | 2 |
| 1 | 1 | 3 | 1 |
| 1 | 1 | 3 | 2 |
| 1 | 2 | 1 | 1 |
| 1 | 2 | 1 | 2 |
之后:
| ID | EVENT_1 | EVENT_2 | EVENT_3 | EVENT_3A |
| -- | ------- | ------- | ------- | -------- |
| 1 | 1 | 1 | 1 | 1 |
| 1 | 1 | 1 | 2 | 2 |
| 1 | 1 | 1 | 3 | 3 |
| 1 | 1 | 2 | 1 | 4 |
| 1 | 1 | 2 | 2 | 5 |
| 1 | 1 | 3 | 1 | 6 |
| 1 | 1 | 3 | 2 | 7 |
| 1 | 2 | 1 | 1 | 1 |
| 1 | 2 | 1 | 2 | 2 |
目标是得到一个列,其中每个 id 都有一个 EVENT_3A,这样 EVENT_3A 就是 EVENT_3 相对于 [=35= 发生的顺序](好像没有EVENT_2)。此外,还有许多 ID 必须独立计算。现在我正在 CPU 上执行此操作,但需要很长时间,所以我想切换到在 GPU 上执行此操作。
我的主要想法是做一个 groupby('ID').apply_grouped()
或 groupby('ID').agg()
但我不知道在 apply_grouped()
或 agg()
函数中放什么。我之前在 CPU 上使用 dask 这样做,但它更直观,因为分组的 DataFrame 直接传递给 apply()
函数。似乎在 cuDF 中我必须通过 incols 并且我无法弄清楚如何将它们视为 DataFrame。
大约有 5,000 个 ID,因此理想情况下每个分组的 ID 都将由 GPU 中的一个核心处理,但我不确定它是否可以那样工作,因为我是 GPU 编程的新手。
任何建议或解决方案都有帮助,谢谢。
The goal is to get a column where for each id, there is an EVENT_3A such that EVENT_3A is the order in which EVENT_3 happens with respect to EVENT_1 (as if there was no EVENT_2).
你描述的是一个groupby累积计数操作,key为[ID, EVENT_1]。它不是在 cuDF 中实现的 yet,因此您可能希望使用用户定义的函数。例如:
您的设置:
import cudf
from numba import cuda
import numpy as np
data = {
"ID":[1,1,1,1,1,1,1,1,1],
"EVENT_1":[1,1,1,1,1,1,1,2,2,],
"EVENT_2":[1,1,1,2,2,3,3,1,1],
"EVENT_3":[1,2,3,1,2,1,2,1,2]
}
gdf = cudf.DataFrame(data)
print(gdf)
ID EVENT_1 EVENT_2 EVENT_3
0 1 1 1 1
1 1 1 1 2
2 1 1 1 3
3 1 1 2 1
4 1 1 2 2
5 1 1 3 1
6 1 1 3 2
7 1 2 1 1
8 1 2 1 2
我们可以而且应该在这里使用 apply_grouped
。我鼓励您查看文档以完全理解这里发生的事情,但在较高级别,我们可以使用组内线程索引作为该行的索引作为计数。我们传递 EVENT_3
列,因此我们确保列名和函数参数匹配。
def cumcount(EVENT_3, cumcount):
for i in range(cuda.threadIdx.x, len(EVENT_3), cuda.blockDim.x):
cumcount[i] = i + 1 # since your exmaple counts start with 1 rather than 0
results = gdf.groupby(["ID", "EVENT_1"]).apply_grouped(cumcount,
incols=['EVENT_3'],
outcols=dict(cumcount=np.int32))
print(results.sort_index()) # get the original row order, for demonstration
ID EVENT_1 EVENT_2 EVENT_3 cumcount
0 1 1 1 1 1
1 1 1 1 2 2
2 1 1 1 3 3
3 1 1 2 1 4
4 1 1 2 2 5
5 1 1 3 1 6
6 1 1 3 2 7
7 1 2 1 1 1
8 1 2 1 2 2
作为完整性检查,您可以证明这些结果在更大的数据上匹配 pandas。
n_ids = 5000
n_rows = 10000000
df = pd.DataFrame({
"ID": np.random.choice(range(n_ids), n_rows),
"EVENT_1": np.random.choice(range(500), n_rows),
"EVENT_2": np.random.choice(range(500), n_rows),
"EVENT_3": np.random.choice(range(n_ids), n_rows)
})
gdf = cudf.from_pandas(df)
results = gdf.groupby(["ID", "EVENT_1"]).apply_grouped(cumcount,
incols=['EVENT_3'],
outcols=dict(cumcount=np.int32))
results = results.sort_index()
pdf_res = df.groupby(["ID", "EVENT_1"]).EVENT_3.cumcount() + 1
print(pdf_res.astype("int32").equals(results['cumcount'].to_pandas()))
True
请注意,如果您的行数小于 100 万并且组数合理,那么在 pandas 中使用 df.groupby([ID, EVENT_1]).EVENT_3.cumcount() + 1
可能会非常快,因为 groupby cumcount 相当有效。话虽如此,cuDF UDF 在规模上将 快很多。
我有一些有序数据,其中有事件层次结构。每一列都是一个事件的唯一 ID,与层次结构中位于其上方的事件相关。类似于每一天的数字在一个月中是唯一的,每个月的数字在一年中是唯一的。我想让最低级别在最高级别内是唯一的,比如通过从 1 到 365 编号使一年中的每一天都是唯一的。我的用例并不特定于日、月和年。
之前:
| ID | EVENT_1 | EVENT_2 | EVENT_3 |
| -- | ------- | ------- | ------- |
| 1 | 1 | 1 | 1 |
| 1 | 1 | 1 | 2 |
| 1 | 1 | 1 | 3 |
| 1 | 1 | 2 | 1 |
| 1 | 1 | 2 | 2 |
| 1 | 1 | 3 | 1 |
| 1 | 1 | 3 | 2 |
| 1 | 2 | 1 | 1 |
| 1 | 2 | 1 | 2 |
之后:
| ID | EVENT_1 | EVENT_2 | EVENT_3 | EVENT_3A |
| -- | ------- | ------- | ------- | -------- |
| 1 | 1 | 1 | 1 | 1 |
| 1 | 1 | 1 | 2 | 2 |
| 1 | 1 | 1 | 3 | 3 |
| 1 | 1 | 2 | 1 | 4 |
| 1 | 1 | 2 | 2 | 5 |
| 1 | 1 | 3 | 1 | 6 |
| 1 | 1 | 3 | 2 | 7 |
| 1 | 2 | 1 | 1 | 1 |
| 1 | 2 | 1 | 2 | 2 |
目标是得到一个列,其中每个 id 都有一个 EVENT_3A,这样 EVENT_3A 就是 EVENT_3 相对于 [=35= 发生的顺序](好像没有EVENT_2)。此外,还有许多 ID 必须独立计算。现在我正在 CPU 上执行此操作,但需要很长时间,所以我想切换到在 GPU 上执行此操作。
我的主要想法是做一个 groupby('ID').apply_grouped()
或 groupby('ID').agg()
但我不知道在 apply_grouped()
或 agg()
函数中放什么。我之前在 CPU 上使用 dask 这样做,但它更直观,因为分组的 DataFrame 直接传递给 apply()
函数。似乎在 cuDF 中我必须通过 incols 并且我无法弄清楚如何将它们视为 DataFrame。
大约有 5,000 个 ID,因此理想情况下每个分组的 ID 都将由 GPU 中的一个核心处理,但我不确定它是否可以那样工作,因为我是 GPU 编程的新手。
任何建议或解决方案都有帮助,谢谢。
The goal is to get a column where for each id, there is an EVENT_3A such that EVENT_3A is the order in which EVENT_3 happens with respect to EVENT_1 (as if there was no EVENT_2).
你描述的是一个groupby累积计数操作,key为[ID, EVENT_1]。它不是在 cuDF 中实现的 yet,因此您可能希望使用用户定义的函数。例如:
您的设置:
import cudf
from numba import cuda
import numpy as np
data = {
"ID":[1,1,1,1,1,1,1,1,1],
"EVENT_1":[1,1,1,1,1,1,1,2,2,],
"EVENT_2":[1,1,1,2,2,3,3,1,1],
"EVENT_3":[1,2,3,1,2,1,2,1,2]
}
gdf = cudf.DataFrame(data)
print(gdf)
ID EVENT_1 EVENT_2 EVENT_3
0 1 1 1 1
1 1 1 1 2
2 1 1 1 3
3 1 1 2 1
4 1 1 2 2
5 1 1 3 1
6 1 1 3 2
7 1 2 1 1
8 1 2 1 2
我们可以而且应该在这里使用 apply_grouped
。我鼓励您查看文档以完全理解这里发生的事情,但在较高级别,我们可以使用组内线程索引作为该行的索引作为计数。我们传递 EVENT_3
列,因此我们确保列名和函数参数匹配。
def cumcount(EVENT_3, cumcount):
for i in range(cuda.threadIdx.x, len(EVENT_3), cuda.blockDim.x):
cumcount[i] = i + 1 # since your exmaple counts start with 1 rather than 0
results = gdf.groupby(["ID", "EVENT_1"]).apply_grouped(cumcount,
incols=['EVENT_3'],
outcols=dict(cumcount=np.int32))
print(results.sort_index()) # get the original row order, for demonstration
ID EVENT_1 EVENT_2 EVENT_3 cumcount
0 1 1 1 1 1
1 1 1 1 2 2
2 1 1 1 3 3
3 1 1 2 1 4
4 1 1 2 2 5
5 1 1 3 1 6
6 1 1 3 2 7
7 1 2 1 1 1
8 1 2 1 2 2
作为完整性检查,您可以证明这些结果在更大的数据上匹配 pandas。
n_ids = 5000
n_rows = 10000000
df = pd.DataFrame({
"ID": np.random.choice(range(n_ids), n_rows),
"EVENT_1": np.random.choice(range(500), n_rows),
"EVENT_2": np.random.choice(range(500), n_rows),
"EVENT_3": np.random.choice(range(n_ids), n_rows)
})
gdf = cudf.from_pandas(df)
results = gdf.groupby(["ID", "EVENT_1"]).apply_grouped(cumcount,
incols=['EVENT_3'],
outcols=dict(cumcount=np.int32))
results = results.sort_index()
pdf_res = df.groupby(["ID", "EVENT_1"]).EVENT_3.cumcount() + 1
print(pdf_res.astype("int32").equals(results['cumcount'].to_pandas()))
True
请注意,如果您的行数小于 100 万并且组数合理,那么在 pandas 中使用 df.groupby([ID, EVENT_1]).EVENT_3.cumcount() + 1
可能会非常快,因为 groupby cumcount 相当有效。话虽如此,cuDF UDF 在规模上将 快很多。