为什么 UDF 不是 运行 在可用的执行器上并行?

Why is UDF not running in parallel on available executors?

我有一个微型 spark Dataframe,它本质上是将字符串推送到 UDF 中。我期待,因为 .repartition(3)targets 的长度相同,run_sequential 内的处理将应用于可用的执行器 - 即应用于 3 个不同的执行器。

问题是只使用了 1 个执行器。我如何并行执行此处理以强制我的 pyspark 脚本将 target 的每个元素分配给不同的执行程序?

import pandas as pd
import pyspark.sql.functions as F

def run_parallel(config):
 
  def run_sequential(target):
    
    #process with target variable
    pass
    
  return F.udf(run_sequential)

targets = ["target_1", "target_2", "target_3"]

config = {}

pdf = spark.createDataFrame(pd.DataFrame({"targets": targets})).repartition(3)

pdf.withColumn(
    "apply_udf", run_training_parallel(config)("targets")
).collect()

这里的问题是重新分区 DataFrame 并不能保证所有创建的分区都具有相同的大小。由于记录数量如此之少,其中一些记录很可能会映射到同一分区。 Spark 不适合处理如此小的数据集,它的算法专为高效处理大量数据而设计 - 如果您的数据集有 300 万条记录,并且您将其分成 大约 1 的 3 个分区每百万条记录,在大多数情况下,每个分区几条记录的差异是微不足道的。重新分区3条记录显然不是这样。

您可以使用df.rdd.glom().map(len).collect()检查重新分区前后分区的大小,以查看分布变化。

$ pyspark --master "local[3]"
...
>>> pdf = spark.createDataFrame([("target_1",), ("target_2",), ("target_3",)]).toDF("targets")
>>> pdf.rdd.glom().map(len).collect()
[1, 1, 1]
>>> pdf.repartition(3).rdd.glom().map(len).collect()
[0, 2, 1]

如您所见,生成的分区是不均匀的,在我的例子中第一个分区实际上是空的。具有讽刺意味的是,原始数据帧具有所需的 属性 而那个被 repartition().

破坏了

虽然您的特定情况不是 Spark 通常针对的情况,但仍然可以强制将三个记录分布在三个分区中。您需要做的就是提供一个明确的分区键。 RDDs 有 zipWithIndex() 方法,用它的 ID 扩展每条记录。 ID 是完美的分区键,因为它的值以 0 开头并以 1 递增。

>>> new_df = (pdf
      .coalesce(1)  # not part of the solution - see below
      .rdd                         # Convert to RDD
      .zipWithIndex()              # Append ID to each record
      .map(lambda x: (x[1], x[0])) # Make record ID come first
      .partitionBy(3)              # Repartition
      .map(lambda x: x[1])         # Remove record ID
      .toDF())                     # Turn back into a dataframe
>>> new_df.rdd.glom().map(len).collect()
[1, 1, 1]

上述代码中加入coalesce(1)只是为了说明最终分区不受pdf最初每个分区一条记录的影响。

DataFrame-only 解决方案是首先将 pdf 合并到一个分区,然后使用 repartition(3)。在没有提供分区列的情况下,DataFrame.repartition() 使用 round-robin 分区程序,因此将实现所需的分区。您不能简单地执行 pdf.coalesce(1).repartition(3),因为 Catalyst(Spark 查询优化引擎)优化了合并操作,因此必须在两者之间插入 partitioning-dependent 操作。添加包含 F.monotonically_increasing_id() 的列是此类操作的理想选择。

>>> new_df = (pdf
      .coalesce(1)
      .withColumn("id", F.monotonically_increasing_id())
      .repartition(3))
>>> new_df.rdd.glom().map(len).collect()
[1, 1, 1]

请注意,与 RDD-based 解决方案不同,coalesce(1) 是解决方案的一部分。