Apache beam 在 2 个 pcollection 之间左连接

Apache beam left join between 2 pcollections

我正在尝试在 pcollection 和它的副本之间进行左连接,所以我正在寻找这样的东西:

((colA, colB, colC, colD))
(a,b,e,f)
(a,b,g,h)
(a,b,i,j)
(c,d,k,l)
(c,d,m,n)

对 colA 和 colB 进行左连接,结果如下所示:

(e,f, g,h)
(e,f, i,j)
(g,h, i,j)

(k,l, m,n)

我使用 apache beam dataframe 来解决它:

df = to_dataframe(pcol)

with dataframe.allow_non_parallel_operations():
     res = df.merge(right=df, left_on=['colA', 'colB'], right_on=['colA', 'colB'])
pcoll = to_pcollection(res)

它工作正常,但是当我必须处理大行的 pcollection 时,出现内存不足错误(这是预期的)

现在我正在寻找 df.merger() 的替代方案,但使用 pcollection 这样我就不会遇到内存错误

如果有人对此问题感兴趣

我想到了另一种逻辑。首先,我按键对记录进行分组,如下所示:

((a,b),(e,f))
((a,b),(g,h))
((a,b),(i,j))
((c,d),(k,l))
((c,d),(m,n))

之后我使用 GroupByKey
组合它们 在下一个转换中我尝试循环抛出所有可能的组合

class combineLev(beam.DoFn):
    #this act like df.merge
    def process(self, element):
        (k, v) = element
        v_ = list(v)
        for i in range(len(v_)):
            for j in range(i,len(v_)):
                if v_[i][1] != v_[j][1]:
                    #print(list_[i][1], list_[j][1])
                    yield (v_[i], v_[j])