从卡夫卡流pyspark中的嵌套json获取数据
Get data from nested json in kafka stream pyspark
我有一个kafka生产者以
的格式发送大量数据
{
'1000':
{
'3':
{
'seq': '1',
'state': '2',
'CMD': 'XOR'
}
},
'1001':
{
'5':
{
'seq': '2',
'state': '2',
'CMD': 'OR'
}
},
'1003':
{
'5':
{
'seq': '3',
'state': '4',
'CMD': 'XOR'
}
}
}
.....
我想要的数据在最后一个循环中:{'seq': '1', 'state': '2', 'CMD': 'XOR'}
上面循环中的键 ('1000' and '3')
是可变的。请注意,以上值仅供参考。原始数据集很大,有很多可变键。只有最后一个循环{'seq', 'state', 'CMD'}
中的键是常量。
我曾尝试使用通用格式读取数据,但得到的数据不正确,因为上面的循环具有可变键,而且我不确定如何定义架构来解析这种格式的数据。
我要实现的输出是格式为
的数据帧
seq state CMD
----------------------
1 2 XOR
2 2 OR
3 4 XOR
这可能是一个适合您的解决方案 - 使用 explode()
和 getItem()
如下-
在此处将 json 加载到数据框中
a_json={
'1000':
{
'3':
{
'seq': '1',
'state': '2',
'CMD': 'XOR'
}
}
}
df = spark.createDataFrame([(a_json)])
df.show(truncate=False)
+-----------------------------------------+
|1000 |
+-----------------------------------------+
|[3 -> [CMD -> XOR, state -> 2, seq -> 1]]|
+-----------------------------------------+
逻辑在这里
df = df.select("*", F.explode("1000").alias("x", "y"))
df = df.withColumn("seq", df.y.getItem("seq")).withColumn("state", df.y.getItem("state")).withColumn("CMD", df.y.getItem("CMD"))
df.show(truncate=False)
+-----------------------------------------+---+----------------------------------+---+-----+---+
|1000 |x |y |seq|state|CMD|
+-----------------------------------------+---+----------------------------------+---+-----+---+
|[3 -> [CMD -> XOR, state -> 2, seq -> 1]]|3 |[CMD -> XOR, state -> 2, seq -> 1]|1 |2 |XOR|
+-----------------------------------------+---+----------------------------------+---+-----+---+
根据进一步的输入更新代码
#Assuming that all the json columns are in a single column, hence making it an array column first.
df = df.withColumn("array_col", F.array("1000", "1001", "1003"))
#Then explode and getItem
df = df.withColumn("explod_col", F.explode("array_col"))
df = df.select("*", F.explode("explod_col").alias("x", "y"))
df_final = df.withColumn("seq", df.y.getItem("seq")).withColumn("state", df.y.getItem("state")).withColumn("CMD", df.y.getItem("CMD"))
df_final.select("seq","state","CMD").show()
|seq|state|CMD|
+---+-----+---+
| 1| 2|XOR|
| 2| 2| OR|
| 3| 4|XOR|
+---+-----+---+
我有一个kafka生产者以
的格式发送大量数据{
'1000':
{
'3':
{
'seq': '1',
'state': '2',
'CMD': 'XOR'
}
},
'1001':
{
'5':
{
'seq': '2',
'state': '2',
'CMD': 'OR'
}
},
'1003':
{
'5':
{
'seq': '3',
'state': '4',
'CMD': 'XOR'
}
}
}
.....
我想要的数据在最后一个循环中:{'seq': '1', 'state': '2', 'CMD': 'XOR'}
上面循环中的键 ('1000' and '3')
是可变的。请注意,以上值仅供参考。原始数据集很大,有很多可变键。只有最后一个循环{'seq', 'state', 'CMD'}
中的键是常量。
我曾尝试使用通用格式读取数据,但得到的数据不正确,因为上面的循环具有可变键,而且我不确定如何定义架构来解析这种格式的数据。
我要实现的输出是格式为
的数据帧seq state CMD
----------------------
1 2 XOR
2 2 OR
3 4 XOR
这可能是一个适合您的解决方案 - 使用 explode()
和 getItem()
如下-
在此处将 json 加载到数据框中
a_json={
'1000':
{
'3':
{
'seq': '1',
'state': '2',
'CMD': 'XOR'
}
}
}
df = spark.createDataFrame([(a_json)])
df.show(truncate=False)
+-----------------------------------------+
|1000 |
+-----------------------------------------+
|[3 -> [CMD -> XOR, state -> 2, seq -> 1]]|
+-----------------------------------------+
逻辑在这里
df = df.select("*", F.explode("1000").alias("x", "y"))
df = df.withColumn("seq", df.y.getItem("seq")).withColumn("state", df.y.getItem("state")).withColumn("CMD", df.y.getItem("CMD"))
df.show(truncate=False)
+-----------------------------------------+---+----------------------------------+---+-----+---+
|1000 |x |y |seq|state|CMD|
+-----------------------------------------+---+----------------------------------+---+-----+---+
|[3 -> [CMD -> XOR, state -> 2, seq -> 1]]|3 |[CMD -> XOR, state -> 2, seq -> 1]|1 |2 |XOR|
+-----------------------------------------+---+----------------------------------+---+-----+---+
根据进一步的输入更新代码
#Assuming that all the json columns are in a single column, hence making it an array column first.
df = df.withColumn("array_col", F.array("1000", "1001", "1003"))
#Then explode and getItem
df = df.withColumn("explod_col", F.explode("array_col"))
df = df.select("*", F.explode("explod_col").alias("x", "y"))
df_final = df.withColumn("seq", df.y.getItem("seq")).withColumn("state", df.y.getItem("state")).withColumn("CMD", df.y.getItem("CMD"))
df_final.select("seq","state","CMD").show()
|seq|state|CMD|
+---+-----+---+
| 1| 2|XOR|
| 2| 2| OR|
| 3| 4|XOR|
+---+-----+---+