如何按行对Spark Dataframe进行并行计算?
How to perform parallel computation on Spark Dataframe by row?
我有 300 000 个点的集合,我想计算它们之间的距离。
id x y
0 0 1 0
1 1 28 76
…
因此,我在这些点之间做了笛卡尔积,然后进行过滤,例如我只保留一个点组合。事实上,为了我的目的,点之间的距离 (0, 1)
与 (1,0)
相同
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
import math
@udf(returnType=IntegerType())
def compute_distance(x1,y1, x2,y2):
return math.square(math.pow(x1-x2) + math.pow(y1-y2))
columns = ['id','x', 'y']
data = [(0, 1, 0), (1, 28,76), (2, 33,42)]
spark = SparkSession\
.builder \
.appName('distance computation') \
.config('spark.sql.execution.arrow.pyspark.enabled', 'true') \
.config('spark.executor.memory', '2g') \
.master('local[20]') \
.getOrCreate()
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(columns)
result = df.alias('a')\
.join(df.alias('b'),
F.array(*['a.id']) < F.array(*['b.id']))\
.withColumn('distance', compute_distance(F.col('a.x'), F.col('a.y'), F.col('b.x'), F.col('b.y')))
result.write.parquet('distance-between-points')
虽然这似乎有效,但我最近的任务 (parquet at NativeMethodAccessorImpl.java:0
) 的 CPU 使用率并未超过 100%。另外,花了一天的时间才完成。
我想知道 withColumn
操作是否在多个执行器上执行以实现并行性?
有没有办法拆分数据以便批量计算距离并将结果存储在一个或多个 Parquet 文件中?
感谢您的见解。
I would like to know if the withColumn operation is performed on multiple executor in order to achieve parallelism ?
是的,假设集群配置正确,数据帧将在您的集群中进行分区,执行程序将并行处理分区 运行 您的 UDF。
Is there a way to split the data in order to compute distance by batch in // and to store them into one or multiples parquet files ?
默认情况下,生成的数据帧将跨集群分区,并作为每个分区的一个 Parquet 文件写出。如果需要,您可以通过重新分区来更改它,但这会导致随机播放并花费更长的时间。
我推荐 Learning Spark 书中的 'Level of Parallelism' 部分以供进一步阅读。
我有 300 000 个点的集合,我想计算它们之间的距离。
id x y
0 0 1 0
1 1 28 76
…
因此,我在这些点之间做了笛卡尔积,然后进行过滤,例如我只保留一个点组合。事实上,为了我的目的,点之间的距离 (0, 1)
与 (1,0)
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
import math
@udf(returnType=IntegerType())
def compute_distance(x1,y1, x2,y2):
return math.square(math.pow(x1-x2) + math.pow(y1-y2))
columns = ['id','x', 'y']
data = [(0, 1, 0), (1, 28,76), (2, 33,42)]
spark = SparkSession\
.builder \
.appName('distance computation') \
.config('spark.sql.execution.arrow.pyspark.enabled', 'true') \
.config('spark.executor.memory', '2g') \
.master('local[20]') \
.getOrCreate()
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(columns)
result = df.alias('a')\
.join(df.alias('b'),
F.array(*['a.id']) < F.array(*['b.id']))\
.withColumn('distance', compute_distance(F.col('a.x'), F.col('a.y'), F.col('b.x'), F.col('b.y')))
result.write.parquet('distance-between-points')
虽然这似乎有效,但我最近的任务 (parquet at NativeMethodAccessorImpl.java:0
) 的 CPU 使用率并未超过 100%。另外,花了一天的时间才完成。
我想知道 withColumn
操作是否在多个执行器上执行以实现并行性?
有没有办法拆分数据以便批量计算距离并将结果存储在一个或多个 Parquet 文件中?
感谢您的见解。
I would like to know if the withColumn operation is performed on multiple executor in order to achieve parallelism ?
是的,假设集群配置正确,数据帧将在您的集群中进行分区,执行程序将并行处理分区 运行 您的 UDF。
Is there a way to split the data in order to compute distance by batch in // and to store them into one or multiples parquet files ?
默认情况下,生成的数据帧将跨集群分区,并作为每个分区的一个 Parquet 文件写出。如果需要,您可以通过重新分区来更改它,但这会导致随机播放并花费更长的时间。
我推荐 Learning Spark 书中的 'Level of Parallelism' 部分以供进一步阅读。