如何使用 pyspark 在 txt 文件 RDD 中找到值列表的交集?
How do I find intersection of value lists in a txt file RDD with pyspark?
我正在学习 spark,想处理文件中所有值的交集
文件格式如下所示:
a
1, 2, 3, 0, ...
b
0, 5, 20, 3, ...
c
0, 7, 9, 10, 2, 20, ...
d
empty
e
empty
我尝试执行以下操作:
rdd = spark.sparkContext.textFile('data.txt')
rdd1 = rdd.map(lambda x: x.split('\t')).map(lambda x: (x[0], x[1])).map(lambda x : (x[0], list(x[1].split('\n,'))))
ab = rdd1.map(lambda x: (x[0], (x[1]))).reduceByKey(lambda x, y: (set(x[0]))).map(lambda x: (x[0], list(set(x[1]))))
我现在有以下格式的数据作为键值对。
[('a', [1, 2, 3, 0, ...]), ('b', [0, 5, 20, 3,...]), ('c', [0, 7, 9, 10, 2, 20, ...]), ...]
我需要找到数据中每个值与其他值的交集,并在 >= 2
.
的位置附上交集长度的键
喜欢:
[key, [list of keys in the entire data whose length of intersections with the current key is >=2]]
例如,键 a
的值与键 b
的值有交集 [0, 3]
。键 a
的值也与键 b
的值有交集 [0, 2]
。类似地,对于键 b
,在 a
和 b
的情况下应该发生同样的事情。那么b
和c
有交集[0, 20]
。最后,d
将被分配 e
,反之亦然,因为两者都是空的。
示例输出:
[('a', [b, c]), ('b', [a, c]), ('c', [a, b]), ('d', [e]), ('e', [d])]
假设您输入的 RDD 类似于:
rdd = spark.sparkContext.parallelize([
('a', [1, 2, 3, 0]), ('b', [0, 5, 20, 3]),
('c', [0, 7, 9, 10, 2, 20]), ('d', []), ('e', []),
])
为了检查所有键组合之间的值交集,您需要在 RDD 上应用笛卡尔积,然后过滤存在交集的值:
from operator import add
def lists_intersect(l1, l2):
if len(l1) == len(l2) == 0: # both empty
return True
if len(set(l1).intersection(l2)) >= 2: # have 2 or more same elements
return True
return False
result = rdd.cartesian(rdd) \
.filter(lambda x: x[0][0] != x[1][0] and lists_intersect(x[0][1], x[1][1])) \
.map(lambda x: (x[0][0], [x[1][0]])) \
.reduceByKey(add)
result.collect()
# [('a', ['b', 'c']), ('d', ['e']), ('c', ['a', 'b']), ('e', ['d']), ('b', ['a', 'c'])]
另一种使用连接而不是笛卡尔积的方法:
# add index to the original rdd
rdd = rdd.zipWithIndex().map(lambda x: (x[1], x[0]))
# generates another rdd that contains pairs of row indices to check
# combinations of all rows except the row itself
indices = range(rdd.count())
indices_rdd = spark.sparkContext.parallelize([
(i, j) for i in indices for j in indices if i != j
])
result = indices_rdd.join(rdd) \
.map(lambda x: (x[1][0], (x[0], x[1][1]))) \
.join(rdd) \
.filter(lambda x: lists_intersect(x[1][0][1][1], x[1][1][1])) \
.map(lambda x: (x[1][0][1][0], [x[1][1][0]])) \
.reduceByKey(add)
我正在学习 spark,想处理文件中所有值的交集
文件格式如下所示:
a
1, 2, 3, 0, ...
b
0, 5, 20, 3, ...
c
0, 7, 9, 10, 2, 20, ...
d
empty
e
empty
我尝试执行以下操作:
rdd = spark.sparkContext.textFile('data.txt')
rdd1 = rdd.map(lambda x: x.split('\t')).map(lambda x: (x[0], x[1])).map(lambda x : (x[0], list(x[1].split('\n,'))))
ab = rdd1.map(lambda x: (x[0], (x[1]))).reduceByKey(lambda x, y: (set(x[0]))).map(lambda x: (x[0], list(set(x[1]))))
我现在有以下格式的数据作为键值对。
[('a', [1, 2, 3, 0, ...]), ('b', [0, 5, 20, 3,...]), ('c', [0, 7, 9, 10, 2, 20, ...]), ...]
我需要找到数据中每个值与其他值的交集,并在 >= 2
.
喜欢:
[key, [list of keys in the entire data whose length of intersections with the current key is >=2]]
例如,键 a
的值与键 b
的值有交集 [0, 3]
。键 a
的值也与键 b
的值有交集 [0, 2]
。类似地,对于键 b
,在 a
和 b
的情况下应该发生同样的事情。那么b
和c
有交集[0, 20]
。最后,d
将被分配 e
,反之亦然,因为两者都是空的。
示例输出:
[('a', [b, c]), ('b', [a, c]), ('c', [a, b]), ('d', [e]), ('e', [d])]
假设您输入的 RDD 类似于:
rdd = spark.sparkContext.parallelize([
('a', [1, 2, 3, 0]), ('b', [0, 5, 20, 3]),
('c', [0, 7, 9, 10, 2, 20]), ('d', []), ('e', []),
])
为了检查所有键组合之间的值交集,您需要在 RDD 上应用笛卡尔积,然后过滤存在交集的值:
from operator import add
def lists_intersect(l1, l2):
if len(l1) == len(l2) == 0: # both empty
return True
if len(set(l1).intersection(l2)) >= 2: # have 2 or more same elements
return True
return False
result = rdd.cartesian(rdd) \
.filter(lambda x: x[0][0] != x[1][0] and lists_intersect(x[0][1], x[1][1])) \
.map(lambda x: (x[0][0], [x[1][0]])) \
.reduceByKey(add)
result.collect()
# [('a', ['b', 'c']), ('d', ['e']), ('c', ['a', 'b']), ('e', ['d']), ('b', ['a', 'c'])]
另一种使用连接而不是笛卡尔积的方法:
# add index to the original rdd
rdd = rdd.zipWithIndex().map(lambda x: (x[1], x[0]))
# generates another rdd that contains pairs of row indices to check
# combinations of all rows except the row itself
indices = range(rdd.count())
indices_rdd = spark.sparkContext.parallelize([
(i, j) for i in indices for j in indices if i != j
])
result = indices_rdd.join(rdd) \
.map(lambda x: (x[1][0], (x[0], x[1][1]))) \
.join(rdd) \
.filter(lambda x: lists_intersect(x[1][0][1][1], x[1][1][1])) \
.map(lambda x: (x[1][0][1][0], [x[1][1][0]])) \
.reduceByKey(add)