Python 修改了 cuDF 中的 groupby ngroup 列表理解

Python modified groupby ngroup in cuDF with list comprehension

我正在尝试编写一个类似于 pandas 的 groupby().ngroups() function 的函数。不同之处在于我希望每个子组计数从 0 重新开始。因此给出以下数据:

| EVENT_1 | EVENT_2 |
| ------- | ------- |
|       0 |       3 | 
|       0 |       3 |
|       0 |       3 |
|       0 |       5 |
|       0 |       5 |
|       0 |       5 |
|       0 |       9 |
|       0 |       9 |
|       0 |       9 |
|       1 |       6 |
|       1 |       6 |

我要

| EVENT_1 | EVENT_2 | EVENT_2A |
| ------- | ------- | -------- |
|       0 |       3 |        0 |
|       0 |       3 |        0 |
|       0 |       3 |        0 |
|       0 |       5 |        1 |
|       0 |       5 |        1 |
|       0 |       5 |        1 |
|       0 |       9 |        2 |
|       0 |       9 |        2 |
|       1 |       6 |        0 |
|       1 |       6 |        0 |

我能想到的最佳实现方式是在 EVENT_1 上执行 groupby(),在每个组中获取 EVENT_2 的唯一值,然后设置 EVENT_2A作为唯一值的索引。例如,在EVENT_1 == 0组中,唯一值是[3, 5, 9],然后我们将EVENT_2A设置为EVENT_2中对应值的唯一值列表中的索引。

我写的代码在这里。请注意,EVENT_2 总是根据 EVENT_1 排序,因此在 O(n) 中找到这样的唯一值应该可行。

import cudf
from numba import cuda
import numpy as np

def count(EVENT_2, EVENT_2A):
    # Get unique values of EVENT_2
    uq = [EVENT_2[0]] + [x for i, x in enumerate(EVENT_2) if i > 0 and EVENT_2[i-1] != x]

    for i in range(cuda.threadIdx.x, len(EVENT_2), cuda.blockDim.x):
        # Get corresponding index for each value. This can probably be sped up by mapping 
        # values to indices
        for j, v in enumerate(uq):
            if v == EVENT_2[i]:
                EVENT_2A[i] = j
                break


if __name__ == "__main__":
    data = {
        "EVENT_1":[0,0,0,0,0,0,0,0,1,1],
        "EVENT_2":[3,3,3,5,5,5,9,9,6,6]
    }
    df = cudf.DataFrame(data)
    results = df.groupby(["EVENT_1"], method="cudf").apply_grouped(
        count, 
        incols=["EVENT_2"], 
        outcols={"EVENT_2A":np.int64}
    )
    print(results.sort_index())

问题在于,在用户定义的函数 count() 中使用列表似乎存在错误。 Numba 说它的 JIT nopython 编译器可以处理 list comprehension 实际上当我使用函数

from numba import jit

@jit(nopython=True)
def uq_sorted(my_list):
    return [my_list[0]] + [x for i, x in enumerate(my_list) if i > 0 and my_list[i-1] != x]

虽然有弃用警告,但它有效。

我使用 cudf 得到的错误是

No implementation of function Function(<numba.cuda.compiler.DeviceFunctionTemplate object at 0x7f782a179fa0>) found for signature:
 
 >>> count <CUDA device function>(array(int64, 1d, C), array(int64, 1d, C))
 
There are 2 candidate implementations:
  - Of which 2 did not match due to:
  Overload in function 'count <CUDA device function>': File: ../../../../test.py: Line 11.
    With argument(s): '(array(int64, 1d, C), array(int64, 1d, C))':
   Rejected as the implementation raised a specific error:
     TypingError: Failed in nopython mode pipeline (step: nopython frontend)
   Unknown attribute 'append' of type list(undefined)<iv=None>
   
   File "test.py", line 12:
   def count(EVENT_2, EVENT_2A):
       uq = [EVENT_2[0]] + [x for i, x in enumerate(EVENT_2) if i > 0 and EVENT_2[i-1] != x]
       ^
   
   During: typing of get attribute at test.py (12)
   
   File "test.py", line 12:
   def count(EVENT_2, EVENT_2A):
       uq = [EVENT_2[0]] + [x for i, x in enumerate(EVENT_2) if i > 0 and EVENT_2[i-1] != x]
       ^

  raised from /project/conda_env/lib/python3.8/site-packages/numba/core/typeinfer.py:1071

During: resolving callee type: Function(<numba.cuda.compiler.DeviceFunctionTemplate object at 0x7f782a179fa0>)
During: typing of call at <string> (10)


File "<string>", line 10:
<source missing, REPL/exec in use?>

这与 numba 的弃用警告有关吗?即使我将 uq 设置为静态列表,我仍然会收到错误消息。欢迎对列表理解问题或我的整个问题提出任何解决方案。谢谢。

向提出这个优雅解决方案的 RAPIDS 社区成员Inzamam 致敬。

让我们从整体上解决问题。您不需要 groupby 或直接使用 for 循环操作数据框。这破坏了封装和并行化,失去了 GPU 计算的优势。您可以利用适当的数据结构,因为它们打算在数据帧 API 中使用。这是一个例子。

import cudf
import numpy as np #only to create a really large array to test scale

### Your Original data
# data = { 
#         "EVENT_1":[0,0,0,0,0,0,0,0,1,1],
#         "EVENT_2":[3,3,3,5,5,5,9,9,6,6]
#     }

### your data at scale (10,000,000 rows)    
data = {
    "EVENT_1":np.random.default_rng().integers(0,10,10000000),
    "EVENT_2":np.random.default_rng().integers(12,20,10000000)
}
df = cudf.DataFrame(data)

from collections import defaultdict

def ngroup_test(df, col1, col2, col3):
    df[col3] = df[col1].astype(str) + ',' + df[col2].astype(str)
    mapping = {}
    d = {}
    last_index = {}
    for marker in df[col3].unique().to_array():
        first, second = marker.split(',')
        if first not in d:
            d[first] = {second: 0}
            last_index[first] = 1
        elif second not in d[first]:
            d[first][second] = last_index[first]
            last_index[first] += 1
        mapping[marker] = d[first][second]

    col_to_insert = list(map(lambda x: mapping[x], list(df[col3].to_array())))
    df[col3] = col_to_insert
    return df

df1 = ngroup_test(df, 'EVENT_1', 'EVENT_2', 'EVENT_2A')
df1