从卡夫卡流pyspark中的嵌套json获取数据

Get data from nested json in kafka stream pyspark

我有一个kafka生产者以

的格式发送大量数据
{
  '1000': 
    {
       '3': 
        {
           'seq': '1', 
           'state': '2', 
           'CMD': 'XOR' 
        }
    },
 '1001': 
    {
       '5': 
        {
           'seq': '2', 
           'state': '2', 
           'CMD': 'OR' 
        }
    },
 '1003': 
    {
       '5': 
        {
           'seq': '3', 
           'state': '4', 
           'CMD': 'XOR' 
        }
    }
}

..... 我想要的数据在最后一个循环中:{'seq': '1', 'state': '2', 'CMD': 'XOR'} 上面循环中的键 ('1000' and '3') 是可变的。请注意,以上值仅供参考。原始数据集很大,有很多可变键。只有最后一个循环{'seq', 'state', 'CMD'}中的键是常量。

我曾尝试使用通用格式读取数据,但得到的数据不正确,因为上面的循环具有可变键,而且我不确定如何定义架构来解析这种格式的数据。

我要实现的输出是格式为

的数据帧
seq    state     CMD
----------------------
 1       2       XOR
 2       2        OR
 3       4       XOR

这可能是一个适合您的解决方案 - 使用 explode()getItem() 如下-

在此处将 json 加载到数据框中

a_json={
  '1000': 
    {
       '3': 
        {
           'seq': '1', 
           'state': '2', 
           'CMD': 'XOR' 
        }
    }
}
df = spark.createDataFrame([(a_json)])
df.show(truncate=False)

+-----------------------------------------+
|1000                                     |
+-----------------------------------------+
|[3 -> [CMD -> XOR, state -> 2, seq -> 1]]|
+-----------------------------------------+

逻辑在这里

df = df.select("*", F.explode("1000").alias("x", "y"))
df = df.withColumn("seq", df.y.getItem("seq")).withColumn("state", df.y.getItem("state")).withColumn("CMD", df.y.getItem("CMD"))
df.show(truncate=False)


 +-----------------------------------------+---+----------------------------------+---+-----+---+
|1000                                     |x  |y                                 |seq|state|CMD|
+-----------------------------------------+---+----------------------------------+---+-----+---+
|[3 -> [CMD -> XOR, state -> 2, seq -> 1]]|3  |[CMD -> XOR, state -> 2, seq -> 1]|1  |2    |XOR|
+-----------------------------------------+---+----------------------------------+---+-----+---+

根据进一步的输入更新代码

#Assuming that all the json columns are in a single column, hence making it an array column first.
df = df.withColumn("array_col", F.array("1000", "1001", "1003"))
#Then explode and getItem
df = df.withColumn("explod_col", F.explode("array_col"))
df = df.select("*", F.explode("explod_col").alias("x", "y"))
df_final = df.withColumn("seq", df.y.getItem("seq")).withColumn("state", df.y.getItem("state")).withColumn("CMD", df.y.getItem("CMD"))
df_final.select("seq","state","CMD").show()
|seq|state|CMD|
+---+-----+---+
|  1|    2|XOR|
|  2|    2| OR|
|  3|    4|XOR|
+---+-----+---+