Pyspark - 基于嵌套结构过滤数据框
Pyspark - filter dataframe based on nested structs
假设我们有以下数据框模式
root
|-- AUTHOR_ID: integer (nullable = false)
|-- NAME: string (nullable = true)
|-- Books: array (nullable = false)
| |-- element: struct (containsNull = false)
| | |-- BOOK_ID: integer (nullable = false)
| | |-- Chapters: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- NAME: string (nullable = true)
| | | | |-- NUMBER_PAGES: integer (nullable = true)
- 如何找到拥有
NUMBER_PAGES < 100
书籍的作者
谢谢
根据您的数据结构,给定 BOOK_ID
的 NUMBER_PAGES
等于其每个章节的 NUMBER_PAGES
的总和。
您可以使用 aggregate
函数计算每本书的页数,然后使用带有 exists
函数的过滤器:
from pyspark.sql import functions as F
df1 = df.filter(
F.exists(
"Books",
lambda x: F.aggregate(x["Chapters"], F.lit(0), lambda a, b: a + b) < F.lit(100)
)
)
对于 Spark <3.1,您需要使用 expr
聚合和存在函数:
df1 = df.filter(
F.expr("exists(Book, x -> aggregate(x.Chapters, 0, (a, b) -> a + b) < 100)")
)
假设我们有以下数据框模式
root
|-- AUTHOR_ID: integer (nullable = false)
|-- NAME: string (nullable = true)
|-- Books: array (nullable = false)
| |-- element: struct (containsNull = false)
| | |-- BOOK_ID: integer (nullable = false)
| | |-- Chapters: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- NAME: string (nullable = true)
| | | | |-- NUMBER_PAGES: integer (nullable = true)
- 如何找到拥有
NUMBER_PAGES < 100
书籍的作者
谢谢
根据您的数据结构,给定 BOOK_ID
的 NUMBER_PAGES
等于其每个章节的 NUMBER_PAGES
的总和。
您可以使用 aggregate
函数计算每本书的页数,然后使用带有 exists
函数的过滤器:
from pyspark.sql import functions as F
df1 = df.filter(
F.exists(
"Books",
lambda x: F.aggregate(x["Chapters"], F.lit(0), lambda a, b: a + b) < F.lit(100)
)
)
对于 Spark <3.1,您需要使用 expr
聚合和存在函数:
df1 = df.filter(
F.expr("exists(Book, x -> aggregate(x.Chapters, 0, (a, b) -> a + b) < 100)")
)