Pyspark - 基于嵌套结构过滤数据框

Pyspark - filter dataframe based on nested structs

假设我们有以下数据框模式

root
 |-- AUTHOR_ID: integer (nullable = false)
 |-- NAME: string (nullable = true)
 |-- Books: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- BOOK_ID: integer (nullable = false)
 |    |    |-- Chapters: array (nullable = true) 
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- NAME: string (nullable = true)
 |    |    |    |    |-- NUMBER_PAGES: integer (nullable = true)

谢谢

根据您的数据结构,给定 BOOK_IDNUMBER_PAGES 等于其每个章节的 NUMBER_PAGES 的总和。

您可以使用 aggregate 函数计算每本书的页数,然后使用带有 exists 函数的过滤器:

from pyspark.sql import functions as F

df1 = df.filter(
    F.exists(
        "Books",
        lambda x: F.aggregate(x["Chapters"], F.lit(0), lambda a, b: a + b) < F.lit(100)
    )
)

对于 Spark <3.1,您需要使用 expr 聚合和存在函数:

df1 = df.filter(    
    F.expr("exists(Book, x -> aggregate(x.Chapters, 0, (a, b) -> a + b) < 100)")
)