为什么 UDF 不是 运行 在可用的执行器上并行?
Why is UDF not running in parallel on available executors?
我有一个微型 spark Dataframe,它本质上是将字符串推送到 UDF 中。我期待,因为 .repartition(3)
与 targets
的长度相同,run_sequential
内的处理将应用于可用的执行器 - 即应用于 3 个不同的执行器。
问题是只使用了 1 个执行器。我如何并行执行此处理以强制我的 pyspark 脚本将 target
的每个元素分配给不同的执行程序?
import pandas as pd
import pyspark.sql.functions as F
def run_parallel(config):
def run_sequential(target):
#process with target variable
pass
return F.udf(run_sequential)
targets = ["target_1", "target_2", "target_3"]
config = {}
pdf = spark.createDataFrame(pd.DataFrame({"targets": targets})).repartition(3)
pdf.withColumn(
"apply_udf", run_training_parallel(config)("targets")
).collect()
这里的问题是重新分区 DataFrame
并不能保证所有创建的分区都具有相同的大小。由于记录数量如此之少,其中一些记录很可能会映射到同一分区。 Spark 不适合处理如此小的数据集,它的算法专为高效处理大量数据而设计 - 如果您的数据集有 300 万条记录,并且您将其分成 大约 1 的 3 个分区每百万条记录,在大多数情况下,每个分区几条记录的差异是微不足道的。重新分区3条记录显然不是这样。
您可以使用df.rdd.glom().map(len).collect()
检查重新分区前后分区的大小,以查看分布变化。
$ pyspark --master "local[3]"
...
>>> pdf = spark.createDataFrame([("target_1",), ("target_2",), ("target_3",)]).toDF("targets")
>>> pdf.rdd.glom().map(len).collect()
[1, 1, 1]
>>> pdf.repartition(3).rdd.glom().map(len).collect()
[0, 2, 1]
如您所见,生成的分区是不均匀的,在我的例子中第一个分区实际上是空的。具有讽刺意味的是,原始数据帧具有所需的 属性 而那个被 repartition()
.
破坏了
虽然您的特定情况不是 Spark 通常针对的情况,但仍然可以强制将三个记录分布在三个分区中。您需要做的就是提供一个明确的分区键。 RDDs 有 zipWithIndex()
方法,用它的 ID 扩展每条记录。 ID 是完美的分区键,因为它的值以 0 开头并以 1 递增。
>>> new_df = (pdf
.coalesce(1) # not part of the solution - see below
.rdd # Convert to RDD
.zipWithIndex() # Append ID to each record
.map(lambda x: (x[1], x[0])) # Make record ID come first
.partitionBy(3) # Repartition
.map(lambda x: x[1]) # Remove record ID
.toDF()) # Turn back into a dataframe
>>> new_df.rdd.glom().map(len).collect()
[1, 1, 1]
上述代码中加入coalesce(1)
只是为了说明最终分区不受pdf
最初每个分区一条记录的影响。
DataFrame-only 解决方案是首先将 pdf
合并到一个分区,然后使用 repartition(3)
。在没有提供分区列的情况下,DataFrame.repartition()
使用 round-robin 分区程序,因此将实现所需的分区。您不能简单地执行 pdf.coalesce(1).repartition(3)
,因为 Catalyst(Spark 查询优化引擎)优化了合并操作,因此必须在两者之间插入 partitioning-dependent 操作。添加包含 F.monotonically_increasing_id()
的列是此类操作的理想选择。
>>> new_df = (pdf
.coalesce(1)
.withColumn("id", F.monotonically_increasing_id())
.repartition(3))
>>> new_df.rdd.glom().map(len).collect()
[1, 1, 1]
请注意,与 RDD-based 解决方案不同,coalesce(1)
是解决方案的一部分。
我有一个微型 spark Dataframe,它本质上是将字符串推送到 UDF 中。我期待,因为 .repartition(3)
与 targets
的长度相同,run_sequential
内的处理将应用于可用的执行器 - 即应用于 3 个不同的执行器。
问题是只使用了 1 个执行器。我如何并行执行此处理以强制我的 pyspark 脚本将 target
的每个元素分配给不同的执行程序?
import pandas as pd
import pyspark.sql.functions as F
def run_parallel(config):
def run_sequential(target):
#process with target variable
pass
return F.udf(run_sequential)
targets = ["target_1", "target_2", "target_3"]
config = {}
pdf = spark.createDataFrame(pd.DataFrame({"targets": targets})).repartition(3)
pdf.withColumn(
"apply_udf", run_training_parallel(config)("targets")
).collect()
这里的问题是重新分区 DataFrame
并不能保证所有创建的分区都具有相同的大小。由于记录数量如此之少,其中一些记录很可能会映射到同一分区。 Spark 不适合处理如此小的数据集,它的算法专为高效处理大量数据而设计 - 如果您的数据集有 300 万条记录,并且您将其分成 大约 1 的 3 个分区每百万条记录,在大多数情况下,每个分区几条记录的差异是微不足道的。重新分区3条记录显然不是这样。
您可以使用df.rdd.glom().map(len).collect()
检查重新分区前后分区的大小,以查看分布变化。
$ pyspark --master "local[3]"
...
>>> pdf = spark.createDataFrame([("target_1",), ("target_2",), ("target_3",)]).toDF("targets")
>>> pdf.rdd.glom().map(len).collect()
[1, 1, 1]
>>> pdf.repartition(3).rdd.glom().map(len).collect()
[0, 2, 1]
如您所见,生成的分区是不均匀的,在我的例子中第一个分区实际上是空的。具有讽刺意味的是,原始数据帧具有所需的 属性 而那个被 repartition()
.
虽然您的特定情况不是 Spark 通常针对的情况,但仍然可以强制将三个记录分布在三个分区中。您需要做的就是提供一个明确的分区键。 RDDs 有 zipWithIndex()
方法,用它的 ID 扩展每条记录。 ID 是完美的分区键,因为它的值以 0 开头并以 1 递增。
>>> new_df = (pdf
.coalesce(1) # not part of the solution - see below
.rdd # Convert to RDD
.zipWithIndex() # Append ID to each record
.map(lambda x: (x[1], x[0])) # Make record ID come first
.partitionBy(3) # Repartition
.map(lambda x: x[1]) # Remove record ID
.toDF()) # Turn back into a dataframe
>>> new_df.rdd.glom().map(len).collect()
[1, 1, 1]
上述代码中加入coalesce(1)
只是为了说明最终分区不受pdf
最初每个分区一条记录的影响。
DataFrame-only 解决方案是首先将 pdf
合并到一个分区,然后使用 repartition(3)
。在没有提供分区列的情况下,DataFrame.repartition()
使用 round-robin 分区程序,因此将实现所需的分区。您不能简单地执行 pdf.coalesce(1).repartition(3)
,因为 Catalyst(Spark 查询优化引擎)优化了合并操作,因此必须在两者之间插入 partitioning-dependent 操作。添加包含 F.monotonically_increasing_id()
的列是此类操作的理想选择。
>>> new_df = (pdf
.coalesce(1)
.withColumn("id", F.monotonically_increasing_id())
.repartition(3))
>>> new_df.rdd.glom().map(len).collect()
[1, 1, 1]
请注意,与 RDD-based 解决方案不同,coalesce(1)
是解决方案的一部分。