循环中的 Pyspark 惰性评估太慢
Pyspark lazy evaluation in loops too slow
首先,我想让你知道,我对 spark 还是很陌生,并且习惯了惰性求值的概念。
这是我的问题:
我有两个通过读取 CSV.GZ 文件加载的 spark DataFrame。
我想做的是合并两个 table,以便根据我在第二个 table 上的键拆分第一个 table。
例如:
Table一个
+----------+---------+--------+---------+------+
| Date| Zone| X| Type|Volume|
+----------+---------+--------+---------+------+
|2019-01-16|010010000| B| A| 684|
|2019-01-16|010020000| B| A| 21771|
|2019-01-16|010030000| B| A| 7497|
|2019-01-16|010040000| B| A| 74852|
Table B
+----+---------+
|Dept| Zone|
+----+---------+
| 01|010010000|
| 02|010020000|
| 01|010030000|
| 02|010040000|
然后,当我合并两个 table 时,我有:
+---------+----------+--------+---------+------+----+
| Zone| Date| X| Type|Volume|Dept|
+---------+----------+--------+---------+------+----+
|010010000|2019-01-16| B| A| 684| 01|
|010020000|2019-01-16| B| A| 21771| 02|
|010030000|2019-01-16| B| A| 7497| 01|
|010040000|2019-01-16| B| A| 74852| 02|
所以我想做的是将这个 table 拆分成 Y 个不连贯的 tables,其中 Y 是我在合并 [ 中找到的不同 'Dept' 值的数量=46=].
例如:
结果 1:
+---------+----------+--------+---------+------+----+
| Zone| Date| X| Type|Volume|Dept|
+---------+----------+--------+---------+------+----+
|010010000|2019-01-16| B| A| 684| 01|
|010030000|2019-01-16| B| A| 7497| 01|
结果 2:
+---------+----------+--------+---------+------+----+
| Zone| Date| X| Type|Volume|Dept|
+---------+----------+--------+---------+------+----+
|010020000|2019-01-16| B| A| 21771| 02|
|010040000|2019-01-16| B| A| 74852| 02|
我的代码如下所示:
sp_df_A = spark.read.csv(file_path_A, header=True, sep=';', encoding='cp1252')
sp_df_B = spark.read.csv(file_path_B, header=True, sep=';', encoding='cp1252')
sp_merged_df = sp_df_A.join(sp_df_B, on=['Zone'], how='left')
# list of unique 'Dept' values on the merged DataFrame
unique_buckets = [x.__getitem__('Dept') for x in sp_merged_df.select('Dept').distinct().collect()]
# Iterate over all 'Dept' found
for zone_bucket in unique_buckets:
print(zone_bucket)
bucket_dir = os.path.join(output_dir, 'Zone_%s' % zone_bucket)
if not os.path.exists(bucket_dir):
os.mkdir(bucket_dir)
# Filter target 'Dept'
tmp_df = sp_merged_df.filter(sp_merged_df['Dept'] == zone_bucket)
# write result
tmp_df.write.format('com.databricks.spark.csv').option('codec', 'org.apache.hadoop.io.compress.GzipCodec').save(bucket_dir, header = 'true')
问题是这个非常简单的代码写出结果要花太多时间。所以我的猜测是惰性求值是在循环的每个周期加载、合并和过滤。
可以这样吗?
你猜对了。您的代码读取、连接和过滤每个存储桶的所有数据。这确实是spark懒惰求值造成的。
Spark 等待任何数据转换,直到执行操作。当一个动作被调用时,spark 会查看所有的转换并创建一个关于如何有效地获得动作结果的计划。当 spark 执行这个计划时,程序会保持不变。当 spark 完成后,程序将继续并 spark "forgets" 关于它所做的一切,直到调用下一个动作。
在你的例子中,spark "forgets" 连接的数据帧 sp_merged_df
并且每次调用 .collect()
或 .save()
时它都会重建它。
如果你想要 spark 到 "remember" 一个 RDD 或 DataFrame,你可以 .cache()
它(参见 docs)。
首先,我想让你知道,我对 spark 还是很陌生,并且习惯了惰性求值的概念。
这是我的问题:
我有两个通过读取 CSV.GZ 文件加载的 spark DataFrame。 我想做的是合并两个 table,以便根据我在第二个 table 上的键拆分第一个 table。
例如:
Table一个
+----------+---------+--------+---------+------+
| Date| Zone| X| Type|Volume|
+----------+---------+--------+---------+------+
|2019-01-16|010010000| B| A| 684|
|2019-01-16|010020000| B| A| 21771|
|2019-01-16|010030000| B| A| 7497|
|2019-01-16|010040000| B| A| 74852|
Table B
+----+---------+
|Dept| Zone|
+----+---------+
| 01|010010000|
| 02|010020000|
| 01|010030000|
| 02|010040000|
然后,当我合并两个 table 时,我有:
+---------+----------+--------+---------+------+----+
| Zone| Date| X| Type|Volume|Dept|
+---------+----------+--------+---------+------+----+
|010010000|2019-01-16| B| A| 684| 01|
|010020000|2019-01-16| B| A| 21771| 02|
|010030000|2019-01-16| B| A| 7497| 01|
|010040000|2019-01-16| B| A| 74852| 02|
所以我想做的是将这个 table 拆分成 Y 个不连贯的 tables,其中 Y 是我在合并 [ 中找到的不同 'Dept' 值的数量=46=].
例如:
结果 1:
+---------+----------+--------+---------+------+----+
| Zone| Date| X| Type|Volume|Dept|
+---------+----------+--------+---------+------+----+
|010010000|2019-01-16| B| A| 684| 01|
|010030000|2019-01-16| B| A| 7497| 01|
结果 2:
+---------+----------+--------+---------+------+----+
| Zone| Date| X| Type|Volume|Dept|
+---------+----------+--------+---------+------+----+
|010020000|2019-01-16| B| A| 21771| 02|
|010040000|2019-01-16| B| A| 74852| 02|
我的代码如下所示:
sp_df_A = spark.read.csv(file_path_A, header=True, sep=';', encoding='cp1252')
sp_df_B = spark.read.csv(file_path_B, header=True, sep=';', encoding='cp1252')
sp_merged_df = sp_df_A.join(sp_df_B, on=['Zone'], how='left')
# list of unique 'Dept' values on the merged DataFrame
unique_buckets = [x.__getitem__('Dept') for x in sp_merged_df.select('Dept').distinct().collect()]
# Iterate over all 'Dept' found
for zone_bucket in unique_buckets:
print(zone_bucket)
bucket_dir = os.path.join(output_dir, 'Zone_%s' % zone_bucket)
if not os.path.exists(bucket_dir):
os.mkdir(bucket_dir)
# Filter target 'Dept'
tmp_df = sp_merged_df.filter(sp_merged_df['Dept'] == zone_bucket)
# write result
tmp_df.write.format('com.databricks.spark.csv').option('codec', 'org.apache.hadoop.io.compress.GzipCodec').save(bucket_dir, header = 'true')
问题是这个非常简单的代码写出结果要花太多时间。所以我的猜测是惰性求值是在循环的每个周期加载、合并和过滤。
可以这样吗?
你猜对了。您的代码读取、连接和过滤每个存储桶的所有数据。这确实是spark懒惰求值造成的。
Spark 等待任何数据转换,直到执行操作。当一个动作被调用时,spark 会查看所有的转换并创建一个关于如何有效地获得动作结果的计划。当 spark 执行这个计划时,程序会保持不变。当 spark 完成后,程序将继续并 spark "forgets" 关于它所做的一切,直到调用下一个动作。
在你的例子中,spark "forgets" 连接的数据帧 sp_merged_df
并且每次调用 .collect()
或 .save()
时它都会重建它。
如果你想要 spark 到 "remember" 一个 RDD 或 DataFrame,你可以 .cache()
它(参见 docs)。