在每个 rdd 中应用不同的操作

Applying a distinct operation within each rdd

我有一些非常大的 pyspark 数据框,其中包含许多重复行。但是,在我的用例中,进行完整 distinct() 的洗牌在时间成本上是不值得的。相反,我希望只在每个分区内应用 distinct。但是,我不知道该怎么做。我试过:

>>> spark = SparkSession.builder.appName('foobar').getOrCreate()
>>> data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
>>> df = spark.createDataFrame(data)
>>> df.rdd.mapPartitions(lambda p: p.distinct()).collect()

>>> spark = SparkSession.builder.appName('foobar').getOrCreate()
>>> data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
>>> df = spark.createDataFrame(data)
>>> df.foreachPartition(lambda p: p.distinct()).collect()

但在这两种情况下我都得到

AttributeError: 'itertools.chain' object has no attribute 'distinct'

您能否就如何实现这一点提出建议?

mapPartitionsp的类型是itertools.chain,不是RDD。您可以使用如下函数 return 链中的唯一元素,然后在对 mapPartitions:

的调用中使用它
def uniq(chain):
    seen = set()
    for x in chain:
        if x in seen:
            continue
        seen.add(x)
        yield x

rdd = sc.parallelize([1,2,2,3,3,4])
rdd.repartition(1).mapPartitions(uniq).foreach(print)

# outputs
# 1
# 2
# 3
# 4

我已将上面的单个分区重新分区以模拟整个示例数据集的唯一性,但假设您已经对数据进行了适当分区,这将正常工作。

编辑:

性能

我不太了解 spark 性能,但总的来说,上面的 uniq 函数应该比 distinct 更快,因为我们明确地在单个分区上操作,而不是 spark 必须协调跨多个分区的独特性。

即使恰好只有 1 个分区,我怀疑仍然存在一层协调,如下面的结果所示 - 我真的很惊讶使用 uniq 的速度比 distinct 在单个分区上...

由于当地的样本量非常小 运行,以下内容并不具体,但希望它能提供一个粗略的指标。

# setup
rdd = sc.parallelize(["hello"] * 1000000).repartition(1)
rdd2 = rdd.repartition(2)

import timeit

# uniq 1 partition
timeit.timeit(lambda: rdd.mapPartitions(uniq).foreach(print), number=100)
# snip output...
17.04100012999993

# uniq 2 partitions
timeit.timeit(lambda: rdd2.mapPartitions(uniq).foreach(print), number=100)
# snip output...
9.535805986000014

# distinct 1 partition
timeit.timeit(lambda: rdd.distinct().foreach(print), number=100)
# snip output...
74.313582924

# distinct 2 partitions
timeit.timeit(lambda: rdd2.distinct().foreach(print), number=100)
# snip output...
38.47051327800011

所以 mapPartitions(uniq) 方法的性能似乎要好一些(请记住,以上是一个相当有限的测试)。