RAPIDS:如何在使用另一个数据帧的 apply_rows 调用的 UDF 中使用一个数据帧?

RAPIDS: How to use one dataframe in a UDF called with apply_rows of another dataframe?

对于数据帧 A 中的每一行,我需要查询 DF B。我需要做这样的事情:按列 b1 (B.b1) 中的值过滤 B 行,这些值在列 A 定义的范围内。 a1 和 A.a2 并将组合值分配给列 A.a3.

在 pandas 中类似于:

A.a1 = B[(B.b1>A.a2) & (B.b1<A.a3)]['b2'].values

我尝试在 UDF 的函数参数中传递数据帧,但出现错误:

ValueError: Cannot determine Numba type of <class 'cudf.core.dataframe.DataFrame'>

下面是使用 Pandas 的工作 Python 示例。

toyevents = pd.DataFrame.from_dict({'end': {0: 8.748356416,
         1: 8.752231441000001,
         2: 8.756627850000001,
         3: 8.760818359,
         4: 8.765967569,
         5: 8.77041589,
         6: 8.774226174,
         7: 8.776358813,
         8: 8.77866835,
         9: 8.780719302000001},
 'name_id': {0: 18452.0,
             1: 20586.0,
             2: 20491.0,
             3: 20610.0,
             4: 20589.0,
             5: 20589.0,
             6: 19165.0,
             7: 20589.0,
             8: 20586.0,
             9: 19064.0},
 'start': {0: 8.748299848,
           1: 8.752229263,
           2: 8.756596980000001,
           3: 8.760816603,
           4: 8.765957310000001,
           5: 8.770381615,
           6: 8.77414259,
           7: 8.776349745000001,
           8: 8.778666861000001,
           9: 8.780674982}})

toynvtx = pd.DataFrame.from_dict({'NvtxEvent.Text': {0: 'Iteration 32',
                    1: 'FWD pass',
                    2: 'Prediction and loss',
                    3: 'BWD pass',
                    4: 'Optimizer update'},
 'end': {0: 8.802574018000001,
         1: 8.771325765,
         2: 8.771688249,
         3: 8.792846429,
         4: 8.802333183},
 'start': {0: 8.744061385,
           1: 8.747272157000001,
           2: 8.771329333,
           3: 8.771691628000001,
           4: 8.792851876}})

# Search NVTX ranges encompassing [start,end] range.
def pickNVTX(r,nvtx):
    start = r['start']
    end = r['end']
    start_early = nvtx[nvtx['start'] <= start]
    end_later = start_early[start_early['end'] >= end]
    return ','.join(end_later['NvtxEvent.Text'])

# Using apply()
toyevents.loc[:,'nvtx'] = toyevents_.apply(pickNVTX,nvtx=toynvtx,axis=1)

# Method 2. Using iterrows()
for i, row in toyevents.iterrows():
    toyevents.loc[i, 'nvtx'] = ','.join(
        toynvtx[(toynvtx.start <= row.start)
                & (toynvtx.end >= row.end)]['NvtxEvent.Text'].values)

对于此类问题,您可能希望使用不等式(条件)连接。 pandas、cuDF 或 BlazingSQL 当前不支持此功能。

如果您的数据不是很大,您可以结合使用交叉连接、布尔掩码和 groupby collect_list。如果您提供第二个数据帧作为参数,那么 UDF 也可能会起作用,这样您就可以对其进行索引并循环(但这会变得混乱且效率低下)。

您的示例的输出是:

        end  name_id     start                   nvtx
0  8.748356  18452.0  8.748300  Iteration 32,FWD pass
1  8.752231  20586.0  8.752229  Iteration 32,FWD pass
2  8.756628  20491.0  8.756597  Iteration 32,FWD pass
3  8.760818  20610.0  8.760817  Iteration 32,FWD pass
4  8.765968  20589.0  8.765957  Iteration 32,FWD pass
5  8.770416  20589.0  8.770382  Iteration 32,FWD pass
6  8.774226  19165.0  8.774143  Iteration 32,BWD pass
7  8.776359  20589.0  8.776350  Iteration 32,BWD pass
8  8.778668  20586.0  8.778667  Iteration 32,BWD pass
9  8.780719  19064.0  8.780675  Iteration 32,BWD pass

以下代码将提供相同的输出,但带有列表列而不是字符串列。

# put the example data on the GPU
toyevents = cudf.from_pandas(toyevents)
toynvtx = cudf.from_pandas(toynvtx)
​
# cross join
toyevents['key'] = 1
toynvtx['key'] = 1
merged = toyevents.merge(toynvtx, how="outer", on="key")
del merged["key"]

# filter
mask = (merged.start_y <= merged.start_x) & (merged.end_y >= merged.end_x)
del merged["start_y"], merged["end_y"]

# collect list
merged[mask].groupby(["end_x", "name_id", "start_x"])["NvtxEvent.Text"].agg(list)
end_x     name_id  start_x 
8.748356  18452.0  8.748300    [Iteration 32, FWD pass]
8.752231  20586.0  8.752229    [Iteration 32, FWD pass]
8.756628  20491.0  8.756597    [Iteration 32, FWD pass]
8.760818  20610.0  8.760817    [Iteration 32, FWD pass]
8.765968  20589.0  8.765957    [Iteration 32, FWD pass]
8.770416  20589.0  8.770382    [Iteration 32, FWD pass]
8.774226  19165.0  8.774143    [Iteration 32, BWD pass]
8.776359  20589.0  8.776350    [Iteration 32, BWD pass]
8.778668  20586.0  8.778667    [Iteration 32, BWD pass]
8.780719  19064.0  8.780675    [Iteration 32, BWD pass]
Name: NvtxEvent.Text, dtype: list