Pyspark groupby with udf:在本地机器上表现不佳
Pyspark groupby with udf: poor performances on local machine
我正在尝试对由多个日常文件(每个 15GB)组成的庞大数据集进行一些分析。
为了更快,仅出于测试目的,我创建了一个非常小的数据集,其中包含所有相关场景。
我必须分析每个用户的正确操作顺序(即类似于日志或审计)。
为此,我定义了一个 udf 函数,然后应用了一个 groupby。
在重现我的用例的代码下方:
import pyspark
from pyspark import SparkContext
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
import time
sc = SparkContext.getOrCreate()
spark = pyspark.sql.SparkSession.builder.appName('example').getOrCreate()
d = spark.createDataFrame(
[(133515, "user1", 100, 'begin'),
(133515, "user1", 125, 'ok'),
(133515, "user1", 150, 'ok'),
(133515, "user1", 200, 'end'),
(133515, "user1", 250, 'begin'),
(133515, "user1", 300, 'end'),
(133515, "user1", 310, 'begin'),
(133515, "user1", 335, 'ok'),
(133515, "user1", 360, 'ok'),
# user1 missing END and STOPPED
(789456, "user2", 150, 'begin'),
(789456, "user2", 175, 'ok'),
(789456, "user2", 200, 'end'),
# user2 stopped
(712346, "user3", 100, 'begin'),
(712346, "user3", 125, 'ok'),
(712346, "user3", 150, 'ok'),
(712346, "user3", 200, 'end'),
#user3 stopped
(789456, "user4", 150, 'begin'),
(789456, "user4", 300, 'end'),
(789456, "user4", 350, 'begin'),
(789456, "user4", 375, 'ok'),
(789456, "user4", 450, 'end'),
(789456, "user4", 475, 'ok'),
#user4 missing BEGIN but ALIVE
], ("ID", "user", "epoch", "ACTION")).orderBy(F.col('epoch'))
d.show()
zip_lists = F.udf(lambda x, y: [list(z) for z in zip(x, y)], ArrayType(StringType()))
start=time.time()
d2 = d.groupBy(F.col('ID'), F.col('user'))\
.agg(zip_lists(F.collect_list('epoch'), F.collect_list('ACTION')).alias('couples'))
d2.show(50, False)
end = time.time()
print(end-start)
这给我带来了以下结果:
+------+-----+--------------------------------------------------------------------------------------------------------------+
|ID |user |couples |
+------+-----+--------------------------------------------------------------------------------------------------------------+
|789456|user4|[[150, begin], [300, end], [350, begin], [375, ok], [450, end], [475, ok]] |
|712346|user3|[[100, begin], [125, ok], [150, ok], [200, end]] |
|133515|user1|[[100, begin], [125, ok], [150, ok], [200, end], [250, begin], [300, end], [310, begin], [335, ok], [360, ok]]|
|789456|user2|[[150, begin], [175, ok], [200, end]] |
+------+-----+--------------------------------------------------------------------------------------------------------------+
189.9082863330841
是不是太慢了?
我正在使用带有 conda 的现代笔记本电脑。我使用 conda navigator 安装了 pyspark。
我做错了什么吗?对于这么小的数据集来说太多了
我没有对两列进行聚合,而是尝试创建一个新列并对其进行收集:
start=time.time()
d2 = d.groupBy(F.col('ID'), F.col('user'))\
.agg(zip_lists(F.collect_list('epoch'), F.collect_list('ACTION')).alias('couples'))\
.collect()
end = time.time()
print('first solution:', end-start)
start = time.time()
d3 = d.select(d.ID, d.user, F.struct([d.epoch, d.ACTION]).alias('couple'))
d4 = d3.groupBy(d3.ID, d3.user)\
.agg(F.collect_list(d3.couple).alias('couples'))\
.collect()
end = time.time()
print('second solution:', end-start)
在我的机器上,此更改使结果更好一些! :D:
first solution: 2.247227907180786
second solution: 0.8280930519104004
我正在尝试对由多个日常文件(每个 15GB)组成的庞大数据集进行一些分析。 为了更快,仅出于测试目的,我创建了一个非常小的数据集,其中包含所有相关场景。 我必须分析每个用户的正确操作顺序(即类似于日志或审计)。
为此,我定义了一个 udf 函数,然后应用了一个 groupby。 在重现我的用例的代码下方:
import pyspark
from pyspark import SparkContext
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
import time
sc = SparkContext.getOrCreate()
spark = pyspark.sql.SparkSession.builder.appName('example').getOrCreate()
d = spark.createDataFrame(
[(133515, "user1", 100, 'begin'),
(133515, "user1", 125, 'ok'),
(133515, "user1", 150, 'ok'),
(133515, "user1", 200, 'end'),
(133515, "user1", 250, 'begin'),
(133515, "user1", 300, 'end'),
(133515, "user1", 310, 'begin'),
(133515, "user1", 335, 'ok'),
(133515, "user1", 360, 'ok'),
# user1 missing END and STOPPED
(789456, "user2", 150, 'begin'),
(789456, "user2", 175, 'ok'),
(789456, "user2", 200, 'end'),
# user2 stopped
(712346, "user3", 100, 'begin'),
(712346, "user3", 125, 'ok'),
(712346, "user3", 150, 'ok'),
(712346, "user3", 200, 'end'),
#user3 stopped
(789456, "user4", 150, 'begin'),
(789456, "user4", 300, 'end'),
(789456, "user4", 350, 'begin'),
(789456, "user4", 375, 'ok'),
(789456, "user4", 450, 'end'),
(789456, "user4", 475, 'ok'),
#user4 missing BEGIN but ALIVE
], ("ID", "user", "epoch", "ACTION")).orderBy(F.col('epoch'))
d.show()
zip_lists = F.udf(lambda x, y: [list(z) for z in zip(x, y)], ArrayType(StringType()))
start=time.time()
d2 = d.groupBy(F.col('ID'), F.col('user'))\
.agg(zip_lists(F.collect_list('epoch'), F.collect_list('ACTION')).alias('couples'))
d2.show(50, False)
end = time.time()
print(end-start)
这给我带来了以下结果:
+------+-----+--------------------------------------------------------------------------------------------------------------+
|ID |user |couples |
+------+-----+--------------------------------------------------------------------------------------------------------------+
|789456|user4|[[150, begin], [300, end], [350, begin], [375, ok], [450, end], [475, ok]] |
|712346|user3|[[100, begin], [125, ok], [150, ok], [200, end]] |
|133515|user1|[[100, begin], [125, ok], [150, ok], [200, end], [250, begin], [300, end], [310, begin], [335, ok], [360, ok]]|
|789456|user2|[[150, begin], [175, ok], [200, end]] |
+------+-----+--------------------------------------------------------------------------------------------------------------+
189.9082863330841
是不是太慢了?
我正在使用带有 conda 的现代笔记本电脑。我使用 conda navigator 安装了 pyspark。
我做错了什么吗?对于这么小的数据集来说太多了
我没有对两列进行聚合,而是尝试创建一个新列并对其进行收集:
start=time.time()
d2 = d.groupBy(F.col('ID'), F.col('user'))\
.agg(zip_lists(F.collect_list('epoch'), F.collect_list('ACTION')).alias('couples'))\
.collect()
end = time.time()
print('first solution:', end-start)
start = time.time()
d3 = d.select(d.ID, d.user, F.struct([d.epoch, d.ACTION]).alias('couple'))
d4 = d3.groupBy(d3.ID, d3.user)\
.agg(F.collect_list(d3.couple).alias('couples'))\
.collect()
end = time.time()
print('second solution:', end-start)
在我的机器上,此更改使结果更好一些! :D:
first solution: 2.247227907180786
second solution: 0.8280930519104004