从 PySpark RDD 中删除重复的元组对
Remove duplicate tuple pairs from PySpark RDD
我得到了一个rdd。例子:
测试 = sc.parallelize([(1,0), (2,0), (3,0)])
我需要获取笛卡尔积并删除具有重复条目的结果元组对。
在这个玩具示例中,这些将是 ((1, 0), (1, 0)), ((2, 0), (2, 0)), ((3, 0), (3, 0)).
我可以按如下方式得到笛卡尔积:注意 collect 和 print 语句仅用于
故障排除。
def compute_cartesian(rdd):
result1 = sc.parallelize(sorted(rdd.cartesian(rdd).collect()))
print(type(result1))
print(result1.collect())
我这个阶段的类型和输出是正确的:
<class 'pyspark.rdd.RDD'>
[((1, 0), (1, 0)), ((1, 0), (2, 0)), ((1, 0), (3, 0)), ((2, 0), (1, 0)), ((2, 0), (2, 0)), ((2, 0), (3, 0)), ((3, 0), (1, 0)), ((3, 0), (2, 0)), ((3, 0), (3, 0))]
但现在我需要删除三对具有重复条目的元组。
到目前为止尝试过:
- .distinct() 这 运行s 但不会产生正确的结果 rdd.
- .dropDuplicates() 不会 运行。我认为这是 .dropDuplicates() 的错误用法。
- 手动功能:
没有 RDD,这个任务很简单。
# Remove duplicates
for elem in result:
if elem[0] == elem[1]:
result.remove(elem)
print(result)
print("After: ", len(result))
这是我编写的一个函数,它删除重复的元组对,然后吐出生成的 len,这样我就可以进行健全性检查。
我只是不确定如何直接对 RDD 执行操作,在这种情况下删除笛卡尔积和 return 一个 RDD 产生的任何重复元组对。
是的,我可以 .collect() 它,执行操作,然后将其重新键入为 RDD,但这违背了目的。假设这是数十亿对。我需要对 rdd 和 return 一个 rdd.
执行操作
您可以使用 filter
删除不需要的对:
dd.cartesian(rdd).filter(lambda x: x[0] != x[1])
请注意,我不会将这些对称为“重复对”,而是“重复对”或更好的“对角线对”:如果您以几何方式可视化笛卡尔积,它们对应于对角线。
这就是 distinct
和 dropDuplicates
在这里不合适的原因:它们删除了重复项,这不是您想要的。例如,[1,1,2].distinct()
是 [1,2]
。
我得到了一个rdd。例子: 测试 = sc.parallelize([(1,0), (2,0), (3,0)])
我需要获取笛卡尔积并删除具有重复条目的结果元组对。 在这个玩具示例中,这些将是 ((1, 0), (1, 0)), ((2, 0), (2, 0)), ((3, 0), (3, 0)).
我可以按如下方式得到笛卡尔积:注意 collect 和 print 语句仅用于 故障排除。
def compute_cartesian(rdd):
result1 = sc.parallelize(sorted(rdd.cartesian(rdd).collect()))
print(type(result1))
print(result1.collect())
我这个阶段的类型和输出是正确的:
<class 'pyspark.rdd.RDD'>
[((1, 0), (1, 0)), ((1, 0), (2, 0)), ((1, 0), (3, 0)), ((2, 0), (1, 0)), ((2, 0), (2, 0)), ((2, 0), (3, 0)), ((3, 0), (1, 0)), ((3, 0), (2, 0)), ((3, 0), (3, 0))]
但现在我需要删除三对具有重复条目的元组。
到目前为止尝试过:
- .distinct() 这 运行s 但不会产生正确的结果 rdd.
- .dropDuplicates() 不会 运行。我认为这是 .dropDuplicates() 的错误用法。
- 手动功能:
没有 RDD,这个任务很简单。
# Remove duplicates
for elem in result:
if elem[0] == elem[1]:
result.remove(elem)
print(result)
print("After: ", len(result))
这是我编写的一个函数,它删除重复的元组对,然后吐出生成的 len,这样我就可以进行健全性检查。
我只是不确定如何直接对 RDD 执行操作,在这种情况下删除笛卡尔积和 return 一个 RDD 产生的任何重复元组对。
是的,我可以 .collect() 它,执行操作,然后将其重新键入为 RDD,但这违背了目的。假设这是数十亿对。我需要对 rdd 和 return 一个 rdd.
执行操作您可以使用 filter
删除不需要的对:
dd.cartesian(rdd).filter(lambda x: x[0] != x[1])
请注意,我不会将这些对称为“重复对”,而是“重复对”或更好的“对角线对”:如果您以几何方式可视化笛卡尔积,它们对应于对角线。
这就是 distinct
和 dropDuplicates
在这里不合适的原因:它们删除了重复项,这不是您想要的。例如,[1,1,2].distinct()
是 [1,2]
。