Pyspark 结构化流式传输 - 来自 2 个嵌套 JSON 的联合数据
Pyspark structured streaming - Union data from 2 nested JSON
我有 2 个 kafka 流数据帧。火花模式如下所示:
root
|-- key: string (nullable = true)
|-- pmudata1: struct (nullable = true)
| |-- pmu_id: byte (nullable = true)
| |-- time: timestamp (nullable = true)
| |-- stream_id: byte (nullable = true)
| |-- stat: string (nullable = true)
和
root
|-- key: string (nullable = true)
|-- pmudata2: struct (nullable = true)
| |-- pmu_id: byte (nullable = true)
| |-- time: timestamp (nullable = true)
| |-- stream_id: byte (nullable = true)
| |-- stat: string (nullable = true)
如何合并来自特定批次 window 的两个流中的所有行?列在两个流中的位置相同。
每个流都有不同的 pmu_id
值,因此我可以根据该值区分记录。
UnionByName
或 union
从单个数据帧生成流。
我想我需要分解列名,类似于 this 但这是针对 scala 的。
有没有办法在列中自动分解整个 JSON 并将它们合并?
您只能对数组和地图类型使用 explode
函数。在您的情况下,列 pmudata2
的类型为 StructType,因此只需像这样使用星号 *
到 select 全部 sub-fields:
df1 = df.selectExpr("key", "pmudata2.*")
#root
#|-- key: string (nullable = true)
#|-- pmu_id: byte (nullable = true)
#|-- time: timestamp (nullable = true)
#|-- stream_id: byte (nullable = true)
#|-- stat: string (nullable = true)
我有 2 个 kafka 流数据帧。火花模式如下所示:
root
|-- key: string (nullable = true)
|-- pmudata1: struct (nullable = true)
| |-- pmu_id: byte (nullable = true)
| |-- time: timestamp (nullable = true)
| |-- stream_id: byte (nullable = true)
| |-- stat: string (nullable = true)
和
root
|-- key: string (nullable = true)
|-- pmudata2: struct (nullable = true)
| |-- pmu_id: byte (nullable = true)
| |-- time: timestamp (nullable = true)
| |-- stream_id: byte (nullable = true)
| |-- stat: string (nullable = true)
如何合并来自特定批次 window 的两个流中的所有行?列在两个流中的位置相同。
每个流都有不同的 pmu_id
值,因此我可以根据该值区分记录。
UnionByName
或 union
从单个数据帧生成流。
我想我需要分解列名,类似于 this 但这是针对 scala 的。 有没有办法在列中自动分解整个 JSON 并将它们合并?
您只能对数组和地图类型使用 explode
函数。在您的情况下,列 pmudata2
的类型为 StructType,因此只需像这样使用星号 *
到 select 全部 sub-fields:
df1 = df.selectExpr("key", "pmudata2.*")
#root
#|-- key: string (nullable = true)
#|-- pmu_id: byte (nullable = true)
#|-- time: timestamp (nullable = true)
#|-- stream_id: byte (nullable = true)
#|-- stat: string (nullable = true)