如何使用 pyspark 在 txt 文件 RDD 中找到值列表的交集?

How do I find intersection of value lists in a txt file RDD with pyspark?

我正在学习 spark,想处理文件中所有值的交集

文件格式如下所示:

a
1, 2, 3, 0, ...

b
0, 5, 20, 3, ...

c
0, 7, 9, 10, 2, 20, ...

d
empty

e
empty

我尝试执行以下操作:

rdd = spark.sparkContext.textFile('data.txt')
rdd1 = rdd.map(lambda x: x.split('\t')).map(lambda x: (x[0], x[1])).map(lambda x : (x[0], list(x[1].split('\n,'))))
ab = rdd1.map(lambda x: (x[0], (x[1]))).reduceByKey(lambda x, y: (set(x[0]))).map(lambda x: (x[0], list(set(x[1]))))

我现在有以下格式的数据作为键值对。

[('a', [1, 2, 3, 0, ...]), ('b', [0, 5, 20, 3,...]), ('c', [0, 7, 9, 10, 2, 20, ...]), ...]

我需要找到数据中每个值与其他值的交集,并在 >= 2.

的位置附上交集长度的键

喜欢:

[key, [list of keys in the entire data whose length of intersections with the current key is >=2]] 

例如,键 a 的值与键 b 的值有交集 [0, 3]。键 a 的值也与键 b 的值有交集 [0, 2]。类似地,对于键 b,在 ab 的情况下应该发生同样的事情。那么bc有交集[0, 20]。最后,d 将被分配 e,反之亦然,因为两者都是空的。

示例输出:

[('a', [b, c]), ('b', [a, c]), ('c', [a, b]), ('d', [e]), ('e', [d])]

假设您输入的 RDD 类似于:

rdd = spark.sparkContext.parallelize([
    ('a', [1, 2, 3, 0]), ('b', [0, 5, 20, 3]),
    ('c', [0, 7, 9, 10, 2, 20]), ('d', []), ('e', []),
])

为了检查所有键组合之间的值交集,您需要在 RDD 上应用笛卡尔积,然后过滤存在交集的值:

from operator import add

def lists_intersect(l1, l2):
    if len(l1) == len(l2) == 0:  # both empty
        return True
    if len(set(l1).intersection(l2)) >= 2:  # have 2 or more same elements
        return True
    return False


result = rdd.cartesian(rdd) \
    .filter(lambda x: x[0][0] != x[1][0] and lists_intersect(x[0][1], x[1][1])) \
    .map(lambda x: (x[0][0], [x[1][0]])) \
    .reduceByKey(add)

result.collect()
# [('a', ['b', 'c']), ('d', ['e']), ('c', ['a', 'b']), ('e', ['d']), ('b', ['a', 'c'])]

另一种使用连接而不是笛卡尔积的方法:

# add index to the original rdd
rdd = rdd.zipWithIndex().map(lambda x: (x[1], x[0]))

# generates another rdd that contains pairs of row indices to check
# combinations of all rows except the row itself 
indices = range(rdd.count())
indices_rdd = spark.sparkContext.parallelize([
    (i, j) for i in indices for j in indices if i != j
])

result = indices_rdd.join(rdd) \
    .map(lambda x: (x[1][0], (x[0], x[1][1]))) \
    .join(rdd) \
    .filter(lambda x: lists_intersect(x[1][0][1][1], x[1][1][1])) \
    .map(lambda x: (x[1][0][1][0], [x[1][1][0]])) \
    .reduceByKey(add)