每个文件夹级别的 pyspark 数据帧最大时间
pyspark dataframe maximum time at every folder level
我想递归地找到每个子文件夹的最大日期时间值,最后找到顶级父文件夹的最大时间戳。
当我 运行 时,spark SQL 变慢了。所以我想在pyspark中使用UDF或数据框方法来实现这个逻辑。
+-----------+---------------+----------------+---------+
|File_Folder|Folder_File_Ind|folder_level_ind|Timestamp|
+-----------+---------------+----------------+---------+
| /A| parent-Folder| 1| null|
| /A/B| sub-folder| 2| null|
| /A/B/1.txt| file| 3| 02022021|
| /A/B/2.txt| file| 4| 02032021|
| /A/C| sub-folder| 2| null|
| /A/C/3.txt| file| 3| 02042021|
| /A/C/4.txt| file| 3| 02052021|
+-----------+---------------+----------------+---------+
输出应显示为时间戳值(空值替换为每个级别的最大时间戳)
输出
+-----------+---------------+---------+
|File_Folder|Folder_File_Ind|Timestamp|
+-----------+---------------+---------+
| /A| parent-Folder| 02052021|
| /A/B| sub-folder| 02032021|
| /A/B/1.txt| file| 02022021|
| /A/B/2.txt| file| 02032021|
| /A/C| sub-folder| 02052021|
| /A/C/3.txt| file| 02042021|
| /A/C/4.txt| file| 02052021|
+-----------+---------------+---------+
SQL 我在下面尝试 SQL 给出了预期的结果。 (当数据帧中有数百万条记录时它太慢了)我尝试使用数据帧缓存但它没有帮助。可能 LIKE 是一项代价高昂的操作。 (仅供参考。我从日期格式中删除了时间,因为它没有正确显示。时间格式在这里不是问题)但是子文件夹和文件夹应该
df.show()
+-----------+---------------+----------------+---------+
|File_Folder|Folder_File_Ind|folder_level_ind|Timestamp|
+-----------+---------------+----------------+---------+
| /A| parent-Folder| 1| null|
| /A/B| sub-folder| 2| null|
| /A/B/1.txt| file| 3| 02022021|
| /A/B/2.txt| file| 4| 02032021|
| /A/C| sub-folder| 2| null|
| /A/C/3.txt| file| 3| 02042021|
| /A/C/4.txt| file| 3| 02052021|
+-----------+---------------+----------------+---------+
>>> self_join_rec = sqlc.sql("SELECT \
... a.File_Folder, a.Folder_File_Ind, Max(b.Timestamp) Timestamp \
... FROM src_table a \
... JOIN src_table b on b.File_Folder LIKE Concat(a.File_Folder, '%') \
... GROUP BY \
... a.File_Folder, a.Folder_File_Ind \
... ORDER BY a.File_Folder,a.Folder_File_Ind"
... )
>>> self_join_rec.show()
+-----------+---------------+---------+
|File_Folder|Folder_File_Ind|Timestamp|
+-----------+---------------+---------+
| /A| parent-Folder| 02052021|
| /A/B| sub-folder| 02032021|
| /A/B/1.txt| file| 02022021|
| /A/B/2.txt| file| 02032021|
| /A/C| sub-folder| 02052021|
| /A/C/3.txt| file| 02042021|
| /A/C/4.txt| file| 02052021|
+-----------+---------------+---------+
- 添加一列
base_folder
,仅包含文件夹部分而没有文件,将用于加入
- 按
base_folder
分组并计算最大时间戳
- 使用
base_folder
加入原始数据帧并获取空行的最大时间戳
from pyspark.sql import functions as F
# create new column base_folder
df = df.withColumn(
"base_folder",
F.when(
F.col("Folder_File_Ind") == "file",
F.regexp_extract("File_Folder", "(.*)/.*", 1)
).otherwise(F.col("File_Folder"))
)
df.cache()
# calculate max timestamp per group of folders
max_df = df.groupby("base_folder").agg(F.max("Timestamp").alias("max_timestamp")).filter("max_timestamp is not null")
# join df with max_df
df1 = df.alias("df").join(
max_df.alias("max"),
F.col("max.base_folder").startswith(F.col("df.base_folder")),
"left"
).groupby("File_Folder").agg(
F.first("Folder_File_Ind").alias("Folder_File_Ind"),
F.first("folder_level_ind").alias("folder_level_ind"),
F.coalesce(
F.first("Timestamp"),
F.max("max_timestamp")
).alias("Timestamp")
)
df1.show()
#+-----------+---------------+----------------+-----------------+
#|File_Folder|Folder_File_Ind|folder_level_ind| Timestamp|
#+-----------+---------------+----------------+-----------------+
#| /A| parent-Folder | 1| 02-FEB-2021 9 PM|
#| /A/C/4.txt| file| 3|02-FEB-2021 11 AM|
#| /A/B/2.txt| file| 3| 02-FEB-2021 9 PM|
#| /A/C| sub-folder | 2|02-FEB-2021 11 AM|
#| /A/C/3.txt| file| 3|02-FEB-2021 11 AM|
#| /A/B| sub-folder| 2| 02-FEB-2021 9 PM|
#| /A/B/1.txt| file| 3| 02-FEB-2021 9 PM|
#+-----------+---------------+----------------+-----------------+
或使用 SQL 查询和 CTE:
sql_query = """
WITH folders_data AS (
SELECT *,
CASE WHEN Folder_File_Ind = 'file' THEN regexp_extract(File_Folder, '(.*)/.*', 1)
ELSE File_Folder
END AS base_folder
FROM src_table
),
max_per_folder AS (
SELECT base_folder,
MAX(Timestamp) AS max_timestamp
FROM folders_data
GROUP BY base_folder
HAVING MAX(Timestamp) IS NOT NULL
)
SELECT File_Folder,
FIRST(Folder_File_Ind) AS Folder_File_Ind,
FIRST(folder_level_ind) AS folder_level_ind,
COALESCE(FIRST(Timestamp), MAX(max_timestamp)) AS Timestamp
FROM folders_data AS t1
LEFT JOIN max_per_folder t2
ON t2.base_folder LIKE CONCAT(t1.base_folder, '%')
GROUP BY File_Folder
"""
spark.sql(sql_query).show()
我想递归地找到每个子文件夹的最大日期时间值,最后找到顶级父文件夹的最大时间戳。 当我 运行 时,spark SQL 变慢了。所以我想在pyspark中使用UDF或数据框方法来实现这个逻辑。
+-----------+---------------+----------------+---------+
|File_Folder|Folder_File_Ind|folder_level_ind|Timestamp|
+-----------+---------------+----------------+---------+
| /A| parent-Folder| 1| null|
| /A/B| sub-folder| 2| null|
| /A/B/1.txt| file| 3| 02022021|
| /A/B/2.txt| file| 4| 02032021|
| /A/C| sub-folder| 2| null|
| /A/C/3.txt| file| 3| 02042021|
| /A/C/4.txt| file| 3| 02052021|
+-----------+---------------+----------------+---------+
输出应显示为时间戳值(空值替换为每个级别的最大时间戳)
输出
+-----------+---------------+---------+
|File_Folder|Folder_File_Ind|Timestamp|
+-----------+---------------+---------+
| /A| parent-Folder| 02052021|
| /A/B| sub-folder| 02032021|
| /A/B/1.txt| file| 02022021|
| /A/B/2.txt| file| 02032021|
| /A/C| sub-folder| 02052021|
| /A/C/3.txt| file| 02042021|
| /A/C/4.txt| file| 02052021|
+-----------+---------------+---------+
SQL 我在下面尝试 SQL 给出了预期的结果。 (当数据帧中有数百万条记录时它太慢了)我尝试使用数据帧缓存但它没有帮助。可能 LIKE 是一项代价高昂的操作。 (仅供参考。我从日期格式中删除了时间,因为它没有正确显示。时间格式在这里不是问题)但是子文件夹和文件夹应该
df.show()
+-----------+---------------+----------------+---------+
|File_Folder|Folder_File_Ind|folder_level_ind|Timestamp|
+-----------+---------------+----------------+---------+
| /A| parent-Folder| 1| null|
| /A/B| sub-folder| 2| null|
| /A/B/1.txt| file| 3| 02022021|
| /A/B/2.txt| file| 4| 02032021|
| /A/C| sub-folder| 2| null|
| /A/C/3.txt| file| 3| 02042021|
| /A/C/4.txt| file| 3| 02052021|
+-----------+---------------+----------------+---------+
>>> self_join_rec = sqlc.sql("SELECT \
... a.File_Folder, a.Folder_File_Ind, Max(b.Timestamp) Timestamp \
... FROM src_table a \
... JOIN src_table b on b.File_Folder LIKE Concat(a.File_Folder, '%') \
... GROUP BY \
... a.File_Folder, a.Folder_File_Ind \
... ORDER BY a.File_Folder,a.Folder_File_Ind"
... )
>>> self_join_rec.show()
+-----------+---------------+---------+
|File_Folder|Folder_File_Ind|Timestamp|
+-----------+---------------+---------+
| /A| parent-Folder| 02052021|
| /A/B| sub-folder| 02032021|
| /A/B/1.txt| file| 02022021|
| /A/B/2.txt| file| 02032021|
| /A/C| sub-folder| 02052021|
| /A/C/3.txt| file| 02042021|
| /A/C/4.txt| file| 02052021|
+-----------+---------------+---------+
- 添加一列
base_folder
,仅包含文件夹部分而没有文件,将用于加入 - 按
base_folder
分组并计算最大时间戳 - 使用
base_folder
加入原始数据帧并获取空行的最大时间戳
from pyspark.sql import functions as F
# create new column base_folder
df = df.withColumn(
"base_folder",
F.when(
F.col("Folder_File_Ind") == "file",
F.regexp_extract("File_Folder", "(.*)/.*", 1)
).otherwise(F.col("File_Folder"))
)
df.cache()
# calculate max timestamp per group of folders
max_df = df.groupby("base_folder").agg(F.max("Timestamp").alias("max_timestamp")).filter("max_timestamp is not null")
# join df with max_df
df1 = df.alias("df").join(
max_df.alias("max"),
F.col("max.base_folder").startswith(F.col("df.base_folder")),
"left"
).groupby("File_Folder").agg(
F.first("Folder_File_Ind").alias("Folder_File_Ind"),
F.first("folder_level_ind").alias("folder_level_ind"),
F.coalesce(
F.first("Timestamp"),
F.max("max_timestamp")
).alias("Timestamp")
)
df1.show()
#+-----------+---------------+----------------+-----------------+
#|File_Folder|Folder_File_Ind|folder_level_ind| Timestamp|
#+-----------+---------------+----------------+-----------------+
#| /A| parent-Folder | 1| 02-FEB-2021 9 PM|
#| /A/C/4.txt| file| 3|02-FEB-2021 11 AM|
#| /A/B/2.txt| file| 3| 02-FEB-2021 9 PM|
#| /A/C| sub-folder | 2|02-FEB-2021 11 AM|
#| /A/C/3.txt| file| 3|02-FEB-2021 11 AM|
#| /A/B| sub-folder| 2| 02-FEB-2021 9 PM|
#| /A/B/1.txt| file| 3| 02-FEB-2021 9 PM|
#+-----------+---------------+----------------+-----------------+
或使用 SQL 查询和 CTE:
sql_query = """
WITH folders_data AS (
SELECT *,
CASE WHEN Folder_File_Ind = 'file' THEN regexp_extract(File_Folder, '(.*)/.*', 1)
ELSE File_Folder
END AS base_folder
FROM src_table
),
max_per_folder AS (
SELECT base_folder,
MAX(Timestamp) AS max_timestamp
FROM folders_data
GROUP BY base_folder
HAVING MAX(Timestamp) IS NOT NULL
)
SELECT File_Folder,
FIRST(Folder_File_Ind) AS Folder_File_Ind,
FIRST(folder_level_ind) AS folder_level_ind,
COALESCE(FIRST(Timestamp), MAX(max_timestamp)) AS Timestamp
FROM folders_data AS t1
LEFT JOIN max_per_folder t2
ON t2.base_folder LIKE CONCAT(t1.base_folder, '%')
GROUP BY File_Folder
"""
spark.sql(sql_query).show()