使用 PySpark 展平层次结构 table

Flatten hierarchy table using PySpark

我正在尝试使用 Pyspark 3.0 (Databricks)

将层次结构 table 展平为固定数量的级别

输入数据:

sc.parallelize([[0,None,'root','?'],[1,0,'a','aaaaa'], [2,1,'b','bbbbb'],[3,1,'c','ccccc'],[4,3,'d','ddddd'],[5,4,'e','eeeee']]).toDF(("id", "parent", "name", "attribute")).createOrReplaceTempView('df')

所需输出:

我有使用 CTE 的输出所需的输出,但必须有一种更简洁的编码方式,感谢任何帮助:

WITH lvl0
AS (
    SELECT 0 AS LEAF_LEVEL, df.id, df.parent, df.name, df.attribute AS lvl0_attribute
    FROM df
    WHERE df.parent IS NULL
    ), lvl1
AS (
    SELECT 1 AS LEAF_LEVEL, df.id, df.parent, df.name, df.attribute
    FROM df
    JOIN lvl0 ON lvl0.id = df.parent
    ), lvl2
AS (
    SELECT 2 AS LEAF_LEVEL, df.id, df.parent, df.name, df.attribute
    FROM df
    JOIN lvl1 ON lvl1.id = df.parent
    ), lvl3
AS (
    SELECT 3 AS LEAF_LEVEL, df.id, df.parent, df.name, df.attribute
    FROM df
    JOIN lvl2 ON lvl2.id = df.parent
    )
SELECT lvl0.LEAF_LEVEL, lvl0.id AS lvl0_id, NULL AS lvl1_id, NULL AS lvl1_name, NULL AS lvl1_attribute, NULL AS lvl2_id, NULL AS lvl2_name, NULL AS lvl2_attribute, NULL AS lvl3_id, NULL AS lvl3_name, NULL AS lvl3_attribute
FROM lvl0

UNION

SELECT lvl1.LEAF_LEVEL, lvl0.id AS lvl0_id, lvl1.id AS lvl1_id, lvl1.name AS lvl1_name, lvl1.attribute AS lvl1_attribute, NULL AS lvl2_id, NULL AS lvl2_name, NULL AS lvl2_attribute, NULL AS lvl3_id, NULL AS lvl3_name, NULL AS lvl3_attribute
FROM lvl0
INNER JOIN lvl1 ON lvl1.parent = lvl0.id

UNION

SELECT lvl2.LEAF_LEVEL, lvl0.id AS lvl0_id, lvl1.id AS lvl1_id, lvl1.name AS lvl1_name, lvl1.attribute AS lvl1_attribute, lvl2.id AS lvl2_id, lvl2.name AS lvl2_name, lvl2.attribute AS lvl2_attribute, NULL AS lvl3_id, NULL AS lvl3_name, NULL AS lvl3_attribute
FROM lvl0
INNER JOIN lvl1 ON lvl1.parent = lvl0.id
INNER JOIN lvl2 ON lvl2.parent = lvl1.id

UNION

SELECT lvl3.LEAF_LEVEL, lvl0.id AS lvl0_id, lvl1.id AS lvl1_id, lvl1.name AS lvl1_name, lvl1.attribute AS lvl1_attribute, lvl2.id AS lvl2_id, lvl2.name AS lvl2_name, lvl2.attribute AS lvl2_attribute, lvl3.id AS lvl3_id, lvl3.name AS lvl3_name, lvl3.attribute AS lvl3_attribute
FROM lvl0
INNER JOIN lvl1 ON lvl1.parent = lvl0.id
INNER JOIN lvl2 ON lvl2.parent = lvl1.id
INNER JOIN lvl3 ON lvl3.parent = lvl2.id
ORDER BY 1, 2, 3, 4, 5, 6 ASC

使用 pyspark 你可以用更通用的方式来写这个,所以它会更简洁。

对于每个级别,从下一个级别连接数据,并使用额外的列与当前级别的数据合并。您可以使用下面定义的 add_next_level 函数和 functools.

中的 reduce 来执行此操作
data = sc.parallelize([[0,None,'root','?'],[1,0,'a','aaaaa'], [2,1,'b','bbbbb'],[3,1,'c','ccccc'],[4,3,'d','ddddd'],[5,4,'e','eeeee']]).toDF(("id", "parent", "name", "attribute"))


def add_next_level(df, level):
    return df.join( #join with next level
            data.select(
                lit(level).alias('next_level'),
                col('parent'),
                col('id').alias(f'lvl{level}_id'),
                col('name').alias(f'lvl{level}_name'),
                col('attribute').alias(f'lvl{level}_attribute')
            ), col(f'lvl{level - 1}_id') == col('parent'), 'left') \
        .withColumn('LEAF_LEVEL', coalesce(col('next_level'), col('LEAF_LEVEL'))) \
        .drop('parent', 'next_level') \
        .union( #union to keep data from current level
            df.join(data, col(f'lvl{level - 1}_id') == col('parent'), 'left_semi')
                .where(col(f'LEAF_LEVEL') == lit(level - 1))
                .withColumn(f'lvl{level}_id', lit(None))
                .withColumn(f'lvl{level}_name', lit(None))
                .withColumn(f'lvl{level}_attribute', lit(None)))

reduce(
    lambda df, num: add_next_level(df, num),
    [1, 2, 3],
    spark.createDataFrame([Row(0, 0)], ['LEAF_LEVEL', 'lvl0_id'])
).sort('lvl1_id', 'lvl2_id', 'lvl3_id').show()