每个文件夹级别的 pyspark 数据帧最大时间

pyspark dataframe maximum time at every folder level

我想递归地找到每个子文件夹的最大日期时间值,最后找到顶级父文件夹的最大时间戳。 当我 运行 时,spark SQL 变慢了。所以我想在pyspark中使用UDF或数据框方法来实现这个逻辑。

+-----------+---------------+----------------+---------+
|File_Folder|Folder_File_Ind|folder_level_ind|Timestamp|
+-----------+---------------+----------------+---------+
|         /A|  parent-Folder|               1|     null|
|       /A/B|     sub-folder|               2|     null|
| /A/B/1.txt|           file|               3| 02022021|
| /A/B/2.txt|           file|               4| 02032021|
|       /A/C|     sub-folder|               2|     null|
| /A/C/3.txt|           file|               3| 02042021|
| /A/C/4.txt|           file|               3| 02052021|
+-----------+---------------+----------------+---------+

输出应显示为时间戳值(空值替换为每个级别的最大时间戳)

输出

+-----------+---------------+---------+
|File_Folder|Folder_File_Ind|Timestamp|
+-----------+---------------+---------+
|         /A|  parent-Folder| 02052021|
|       /A/B|     sub-folder| 02032021|
| /A/B/1.txt|           file| 02022021|
| /A/B/2.txt|           file| 02032021|
|       /A/C|     sub-folder| 02052021|
| /A/C/3.txt|           file| 02042021|
| /A/C/4.txt|           file| 02052021|
+-----------+---------------+---------+

SQL 我在下面尝试 SQL 给出了预期的结果。 (当数据帧中有数百万条记录时它太慢了)我尝试使用数据帧缓存但它没有帮助。可能 LIKE 是一项代价高昂的操作。 (仅供参考。我从日期格式中删除了时间,因为它没有正确显示。时间格式在这里不是问题)但是子文件夹和文件夹应该

df.show()
+-----------+---------------+----------------+---------+
|File_Folder|Folder_File_Ind|folder_level_ind|Timestamp|
+-----------+---------------+----------------+---------+
|         /A|  parent-Folder|               1|     null|
|       /A/B|     sub-folder|               2|     null|
| /A/B/1.txt|           file|               3| 02022021|
| /A/B/2.txt|           file|               4| 02032021|
|       /A/C|     sub-folder|               2|     null|
| /A/C/3.txt|           file|               3| 02042021|
| /A/C/4.txt|           file|               3| 02052021|
+-----------+---------------+----------------+---------+

>>> self_join_rec =  sqlc.sql("SELECT   \
...     a.File_Folder,  a.Folder_File_Ind, Max(b.Timestamp) Timestamp  \
...     FROM src_table a \
...     JOIN src_table b on b.File_Folder LIKE Concat(a.File_Folder, '%')   \
...     GROUP BY \
...     a.File_Folder,  a.Folder_File_Ind \
...     ORDER BY a.File_Folder,a.Folder_File_Ind"
... )
>>> self_join_rec.show()
+-----------+---------------+---------+
|File_Folder|Folder_File_Ind|Timestamp|
+-----------+---------------+---------+
|         /A|  parent-Folder| 02052021|
|       /A/B|     sub-folder| 02032021|
| /A/B/1.txt|           file| 02022021|
| /A/B/2.txt|           file| 02032021|
|       /A/C|     sub-folder| 02052021|
| /A/C/3.txt|           file| 02042021|
| /A/C/4.txt|           file| 02052021|
+-----------+---------------+---------+
  1. 添加一列base_folder,仅包含文件夹部分而没有文件,将用于加入
  2. base_folder分组并计算最大时间戳
  3. 使用 base_folder 加入原始数据帧并获取空行的最大时间戳
from pyspark.sql import functions as F

# create new column base_folder
df = df.withColumn(
    "base_folder",
    F.when(
        F.col("Folder_File_Ind") == "file",
        F.regexp_extract("File_Folder", "(.*)/.*", 1)
    ).otherwise(F.col("File_Folder"))
)

df.cache()

# calculate max timestamp per group of folders
max_df = df.groupby("base_folder").agg(F.max("Timestamp").alias("max_timestamp")).filter("max_timestamp is not null")

# join df with max_df
df1 = df.alias("df").join(
    max_df.alias("max"),
    F.col("max.base_folder").startswith(F.col("df.base_folder")),
    "left"
).groupby("File_Folder").agg(
    F.first("Folder_File_Ind").alias("Folder_File_Ind"),
    F.first("folder_level_ind").alias("folder_level_ind"),
    F.coalesce(
        F.first("Timestamp"),
        F.max("max_timestamp")
    ).alias("Timestamp")
)

df1.show()

#+-----------+---------------+----------------+-----------------+
#|File_Folder|Folder_File_Ind|folder_level_ind|        Timestamp|
#+-----------+---------------+----------------+-----------------+
#|         /A| parent-Folder |               1| 02-FEB-2021 9 PM|
#| /A/C/4.txt|           file|               3|02-FEB-2021 11 AM|
#| /A/B/2.txt|           file|               3| 02-FEB-2021 9 PM|
#|       /A/C|    sub-folder |               2|02-FEB-2021 11 AM|
#| /A/C/3.txt|           file|               3|02-FEB-2021 11 AM|
#|       /A/B|     sub-folder|               2| 02-FEB-2021 9 PM|
#| /A/B/1.txt|           file|               3| 02-FEB-2021 9 PM|
#+-----------+---------------+----------------+-----------------+

或使用 SQL 查询和 CTE:

sql_query = """
WITH folders_data AS (
    SELECT  *,
            CASE WHEN Folder_File_Ind = 'file' THEN regexp_extract(File_Folder, '(.*)/.*', 1)
                 ELSE File_Folder
            END AS base_folder
    FROM    src_table
), 
max_per_folder AS (
    SELECT  base_folder,
            MAX(Timestamp) AS max_timestamp
    FROM    folders_data
    GROUP BY  base_folder
    HAVING  MAX(Timestamp) IS NOT NULL
)

SELECT  File_Folder,
        FIRST(Folder_File_Ind)      AS Folder_File_Ind,
        FIRST(folder_level_ind)     AS folder_level_ind,
        COALESCE(FIRST(Timestamp), MAX(max_timestamp))  AS Timestamp
FROM    folders_data AS t1
LEFT JOIN max_per_folder t2
ON      t2.base_folder LIKE CONCAT(t1.base_folder, '%')
GROUP BY File_Folder
"""

spark.sql(sql_query).show()