如何按行对Spark Dataframe进行并行计算?

How to perform parallel computation on Spark Dataframe by row?

我有 300 000 个点的集合,我想计算它们之间的距离。

    id      x    y
0   0       1    0
1   1       28   76
…

因此,我在这些点之间做了笛卡尔积,然后进行过滤,例如我只保留一个点组合。事实上,为了我的目的,点之间的距离 (0, 1)(1,0)

相同
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
import math


@udf(returnType=IntegerType())
def compute_distance(x1,y1, x2,y2):
    return math.square(math.pow(x1-x2) + math.pow(y1-y2))


columns = ['id','x', 'y']
data = [(0, 1, 0), (1, 28,76), (2, 33,42)]
spark = SparkSession\
            .builder \
            .appName('distance computation') \
            .config('spark.sql.execution.arrow.pyspark.enabled', 'true') \
            .config('spark.executor.memory', '2g') \
            .master('local[20]') \
            .getOrCreate()
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(columns)
result = df.alias('a')\
               .join(df.alias('b'),
                     F.array(*['a.id']) < F.array(*['b.id']))\
               .withColumn('distance', compute_distance(F.col('a.x'), F.col('a.y'), F.col('b.x'), F.col('b.y')))

result.write.parquet('distance-between-points')

虽然这似乎有效,但我最近的任务 (parquet at NativeMethodAccessorImpl.java:0) 的 CPU 使用率并未超过 100%。另外,花了一天的时间才完成。

我想知道 withColumn 操作是否在多个执行器上执行以实现并行性?

有没有办法拆分数据以便批量计算距离并将结果存储在一个或多个 Parquet 文件中?

感谢您的见解。

I would like to know if the withColumn operation is performed on multiple executor in order to achieve parallelism ?

是的,假设集群配置正确,数据帧将在您的集群中进行分区,执行程序将并行处理分区 运行 您的 UDF。

Is there a way to split the data in order to compute distance by batch in // and to store them into one or multiples parquet files ?

默认情况下,生成的数据帧将跨集群分区,并作为每个分区的一个 Parquet 文件写出。如果需要,您可以通过重新分区来更改它,但这会导致随机播放并花费更长的时间。

我推荐 Learning Spark 书中的 'Level of Parallelism' 部分以供进一步阅读。