Python 修改了 cuDF 中的 groupby ngroup 列表理解
Python modified groupby ngroup in cuDF with list comprehension
我正在尝试编写一个类似于 pandas 的 groupby().ngroups()
function 的函数。不同之处在于我希望每个子组计数从 0 重新开始。因此给出以下数据:
| EVENT_1 | EVENT_2 |
| ------- | ------- |
| 0 | 3 |
| 0 | 3 |
| 0 | 3 |
| 0 | 5 |
| 0 | 5 |
| 0 | 5 |
| 0 | 9 |
| 0 | 9 |
| 0 | 9 |
| 1 | 6 |
| 1 | 6 |
我要
| EVENT_1 | EVENT_2 | EVENT_2A |
| ------- | ------- | -------- |
| 0 | 3 | 0 |
| 0 | 3 | 0 |
| 0 | 3 | 0 |
| 0 | 5 | 1 |
| 0 | 5 | 1 |
| 0 | 5 | 1 |
| 0 | 9 | 2 |
| 0 | 9 | 2 |
| 1 | 6 | 0 |
| 1 | 6 | 0 |
我能想到的最佳实现方式是在 EVENT_1 上执行 groupby()
,在每个组中获取 EVENT_2 的唯一值,然后设置 EVENT_2A作为唯一值的索引。例如,在EVENT_1 == 0
组中,唯一值是[3, 5, 9]
,然后我们将EVENT_2A设置为EVENT_2中对应值的唯一值列表中的索引。
我写的代码在这里。请注意,EVENT_2 总是根据 EVENT_1 排序,因此在 O(n) 中找到这样的唯一值应该可行。
import cudf
from numba import cuda
import numpy as np
def count(EVENT_2, EVENT_2A):
# Get unique values of EVENT_2
uq = [EVENT_2[0]] + [x for i, x in enumerate(EVENT_2) if i > 0 and EVENT_2[i-1] != x]
for i in range(cuda.threadIdx.x, len(EVENT_2), cuda.blockDim.x):
# Get corresponding index for each value. This can probably be sped up by mapping
# values to indices
for j, v in enumerate(uq):
if v == EVENT_2[i]:
EVENT_2A[i] = j
break
if __name__ == "__main__":
data = {
"EVENT_1":[0,0,0,0,0,0,0,0,1,1],
"EVENT_2":[3,3,3,5,5,5,9,9,6,6]
}
df = cudf.DataFrame(data)
results = df.groupby(["EVENT_1"], method="cudf").apply_grouped(
count,
incols=["EVENT_2"],
outcols={"EVENT_2A":np.int64}
)
print(results.sort_index())
问题在于,在用户定义的函数 count()
中使用列表似乎存在错误。 Numba 说它的 JIT nopython 编译器可以处理 list comprehension 实际上当我使用函数
from numba import jit
@jit(nopython=True)
def uq_sorted(my_list):
return [my_list[0]] + [x for i, x in enumerate(my_list) if i > 0 and my_list[i-1] != x]
虽然有弃用警告,但它有效。
我使用 cudf 得到的错误是
No implementation of function Function(<numba.cuda.compiler.DeviceFunctionTemplate object at 0x7f782a179fa0>) found for signature:
>>> count <CUDA device function>(array(int64, 1d, C), array(int64, 1d, C))
There are 2 candidate implementations:
- Of which 2 did not match due to:
Overload in function 'count <CUDA device function>': File: ../../../../test.py: Line 11.
With argument(s): '(array(int64, 1d, C), array(int64, 1d, C))':
Rejected as the implementation raised a specific error:
TypingError: Failed in nopython mode pipeline (step: nopython frontend)
Unknown attribute 'append' of type list(undefined)<iv=None>
File "test.py", line 12:
def count(EVENT_2, EVENT_2A):
uq = [EVENT_2[0]] + [x for i, x in enumerate(EVENT_2) if i > 0 and EVENT_2[i-1] != x]
^
During: typing of get attribute at test.py (12)
File "test.py", line 12:
def count(EVENT_2, EVENT_2A):
uq = [EVENT_2[0]] + [x for i, x in enumerate(EVENT_2) if i > 0 and EVENT_2[i-1] != x]
^
raised from /project/conda_env/lib/python3.8/site-packages/numba/core/typeinfer.py:1071
During: resolving callee type: Function(<numba.cuda.compiler.DeviceFunctionTemplate object at 0x7f782a179fa0>)
During: typing of call at <string> (10)
File "<string>", line 10:
<source missing, REPL/exec in use?>
这与 numba 的弃用警告有关吗?即使我将 uq
设置为静态列表,我仍然会收到错误消息。欢迎对列表理解问题或我的整个问题提出任何解决方案。谢谢。
向提出这个优雅解决方案的 RAPIDS 社区成员Inzamam 致敬。
让我们从整体上解决问题。您不需要 groupby 或直接使用 for 循环操作数据框。这破坏了封装和并行化,失去了 GPU 计算的优势。您可以利用适当的数据结构,因为它们打算在数据帧 API 中使用。这是一个例子。
import cudf
import numpy as np #only to create a really large array to test scale
### Your Original data
# data = {
# "EVENT_1":[0,0,0,0,0,0,0,0,1,1],
# "EVENT_2":[3,3,3,5,5,5,9,9,6,6]
# }
### your data at scale (10,000,000 rows)
data = {
"EVENT_1":np.random.default_rng().integers(0,10,10000000),
"EVENT_2":np.random.default_rng().integers(12,20,10000000)
}
df = cudf.DataFrame(data)
from collections import defaultdict
def ngroup_test(df, col1, col2, col3):
df[col3] = df[col1].astype(str) + ',' + df[col2].astype(str)
mapping = {}
d = {}
last_index = {}
for marker in df[col3].unique().to_array():
first, second = marker.split(',')
if first not in d:
d[first] = {second: 0}
last_index[first] = 1
elif second not in d[first]:
d[first][second] = last_index[first]
last_index[first] += 1
mapping[marker] = d[first][second]
col_to_insert = list(map(lambda x: mapping[x], list(df[col3].to_array())))
df[col3] = col_to_insert
return df
df1 = ngroup_test(df, 'EVENT_1', 'EVENT_2', 'EVENT_2A')
df1
我正在尝试编写一个类似于 pandas 的 groupby().ngroups()
function 的函数。不同之处在于我希望每个子组计数从 0 重新开始。因此给出以下数据:
| EVENT_1 | EVENT_2 |
| ------- | ------- |
| 0 | 3 |
| 0 | 3 |
| 0 | 3 |
| 0 | 5 |
| 0 | 5 |
| 0 | 5 |
| 0 | 9 |
| 0 | 9 |
| 0 | 9 |
| 1 | 6 |
| 1 | 6 |
我要
| EVENT_1 | EVENT_2 | EVENT_2A |
| ------- | ------- | -------- |
| 0 | 3 | 0 |
| 0 | 3 | 0 |
| 0 | 3 | 0 |
| 0 | 5 | 1 |
| 0 | 5 | 1 |
| 0 | 5 | 1 |
| 0 | 9 | 2 |
| 0 | 9 | 2 |
| 1 | 6 | 0 |
| 1 | 6 | 0 |
我能想到的最佳实现方式是在 EVENT_1 上执行 groupby()
,在每个组中获取 EVENT_2 的唯一值,然后设置 EVENT_2A作为唯一值的索引。例如,在EVENT_1 == 0
组中,唯一值是[3, 5, 9]
,然后我们将EVENT_2A设置为EVENT_2中对应值的唯一值列表中的索引。
我写的代码在这里。请注意,EVENT_2 总是根据 EVENT_1 排序,因此在 O(n) 中找到这样的唯一值应该可行。
import cudf
from numba import cuda
import numpy as np
def count(EVENT_2, EVENT_2A):
# Get unique values of EVENT_2
uq = [EVENT_2[0]] + [x for i, x in enumerate(EVENT_2) if i > 0 and EVENT_2[i-1] != x]
for i in range(cuda.threadIdx.x, len(EVENT_2), cuda.blockDim.x):
# Get corresponding index for each value. This can probably be sped up by mapping
# values to indices
for j, v in enumerate(uq):
if v == EVENT_2[i]:
EVENT_2A[i] = j
break
if __name__ == "__main__":
data = {
"EVENT_1":[0,0,0,0,0,0,0,0,1,1],
"EVENT_2":[3,3,3,5,5,5,9,9,6,6]
}
df = cudf.DataFrame(data)
results = df.groupby(["EVENT_1"], method="cudf").apply_grouped(
count,
incols=["EVENT_2"],
outcols={"EVENT_2A":np.int64}
)
print(results.sort_index())
问题在于,在用户定义的函数 count()
中使用列表似乎存在错误。 Numba 说它的 JIT nopython 编译器可以处理 list comprehension 实际上当我使用函数
from numba import jit
@jit(nopython=True)
def uq_sorted(my_list):
return [my_list[0]] + [x for i, x in enumerate(my_list) if i > 0 and my_list[i-1] != x]
虽然有弃用警告,但它有效。
我使用 cudf 得到的错误是
No implementation of function Function(<numba.cuda.compiler.DeviceFunctionTemplate object at 0x7f782a179fa0>) found for signature:
>>> count <CUDA device function>(array(int64, 1d, C), array(int64, 1d, C))
There are 2 candidate implementations:
- Of which 2 did not match due to:
Overload in function 'count <CUDA device function>': File: ../../../../test.py: Line 11.
With argument(s): '(array(int64, 1d, C), array(int64, 1d, C))':
Rejected as the implementation raised a specific error:
TypingError: Failed in nopython mode pipeline (step: nopython frontend)
Unknown attribute 'append' of type list(undefined)<iv=None>
File "test.py", line 12:
def count(EVENT_2, EVENT_2A):
uq = [EVENT_2[0]] + [x for i, x in enumerate(EVENT_2) if i > 0 and EVENT_2[i-1] != x]
^
During: typing of get attribute at test.py (12)
File "test.py", line 12:
def count(EVENT_2, EVENT_2A):
uq = [EVENT_2[0]] + [x for i, x in enumerate(EVENT_2) if i > 0 and EVENT_2[i-1] != x]
^
raised from /project/conda_env/lib/python3.8/site-packages/numba/core/typeinfer.py:1071
During: resolving callee type: Function(<numba.cuda.compiler.DeviceFunctionTemplate object at 0x7f782a179fa0>)
During: typing of call at <string> (10)
File "<string>", line 10:
<source missing, REPL/exec in use?>
这与 numba 的弃用警告有关吗?即使我将 uq
设置为静态列表,我仍然会收到错误消息。欢迎对列表理解问题或我的整个问题提出任何解决方案。谢谢。
向提出这个优雅解决方案的 RAPIDS 社区成员Inzamam 致敬。
让我们从整体上解决问题。您不需要 groupby 或直接使用 for 循环操作数据框。这破坏了封装和并行化,失去了 GPU 计算的优势。您可以利用适当的数据结构,因为它们打算在数据帧 API 中使用。这是一个例子。
import cudf
import numpy as np #only to create a really large array to test scale
### Your Original data
# data = {
# "EVENT_1":[0,0,0,0,0,0,0,0,1,1],
# "EVENT_2":[3,3,3,5,5,5,9,9,6,6]
# }
### your data at scale (10,000,000 rows)
data = {
"EVENT_1":np.random.default_rng().integers(0,10,10000000),
"EVENT_2":np.random.default_rng().integers(12,20,10000000)
}
df = cudf.DataFrame(data)
from collections import defaultdict
def ngroup_test(df, col1, col2, col3):
df[col3] = df[col1].astype(str) + ',' + df[col2].astype(str)
mapping = {}
d = {}
last_index = {}
for marker in df[col3].unique().to_array():
first, second = marker.split(',')
if first not in d:
d[first] = {second: 0}
last_index[first] = 1
elif second not in d[first]:
d[first][second] = last_index[first]
last_index[first] += 1
mapping[marker] = d[first][second]
col_to_insert = list(map(lambda x: mapping[x], list(df[col3].to_array())))
df[col3] = col_to_insert
return df
df1 = ngroup_test(df, 'EVENT_1', 'EVENT_2', 'EVENT_2A')
df1