如何旋转 pyspark 流数据帧
How to pivot a pyspark streaming dataframe
我在 pyspark 结构化流中接收流数据,我需要对它们进行透视,以便我可以从该数据中获得一行。
进入我的集群的数据结构是:
{
"version": 1.0.0,
"message": {
"data": [{
"name": "name_1",
"value": 1.0},
...
{
"name": "name_2",
"value": 2.0}]
}
}
我的代码如下:
dfStreaming = spark \
.readStream \
.format("eventhubs") \
.options(**optionConf()) \
.load() \
.select(explode("message.data").alias("data")) \
.select(("data.*")) \
我得到以下结果数据框:
|---------------------|------------------|
| Name | Value |
|---------------------|------------------|
| Name_1 | 1.0 |
|---------------------|------------------|
| Name_2 | 2.0 |
|---------------------|------------------|
但我需要以下结构(它实际上是 table 的一个枢轴):
|---------------------|------------------|
| Name_1 | Name_2 |
|---------------------|------------------|
| 1.0 | 2.0 |
|---------------------|------------------|
流式数据帧上的枢轴是不允许的,但我想应该有一个解决方案。
非常感谢您的帮助。
解决方案是在重新创建数据框行的情况下添加多个聚合。
dfStreaming = spark \
.readStream \
.format("eventhubs") \
.options(**optionConf()) \
.load() \
.select(explode("message.data").alias("data")) \
.select(("data.*")) \
.selectexpr(["sum(case when Name=Name_of_desired_column then Value else null) as Name_of_desired_column"])
我在 pyspark 结构化流中接收流数据,我需要对它们进行透视,以便我可以从该数据中获得一行。
进入我的集群的数据结构是:
{
"version": 1.0.0,
"message": {
"data": [{
"name": "name_1",
"value": 1.0},
...
{
"name": "name_2",
"value": 2.0}]
}
}
我的代码如下:
dfStreaming = spark \
.readStream \
.format("eventhubs") \
.options(**optionConf()) \
.load() \
.select(explode("message.data").alias("data")) \
.select(("data.*")) \
我得到以下结果数据框:
|---------------------|------------------|
| Name | Value |
|---------------------|------------------|
| Name_1 | 1.0 |
|---------------------|------------------|
| Name_2 | 2.0 |
|---------------------|------------------|
但我需要以下结构(它实际上是 table 的一个枢轴):
|---------------------|------------------|
| Name_1 | Name_2 |
|---------------------|------------------|
| 1.0 | 2.0 |
|---------------------|------------------|
流式数据帧上的枢轴是不允许的,但我想应该有一个解决方案。
非常感谢您的帮助。
解决方案是在重新创建数据框行的情况下添加多个聚合。
dfStreaming = spark \
.readStream \
.format("eventhubs") \
.options(**optionConf()) \
.load() \
.select(explode("message.data").alias("data")) \
.select(("data.*")) \
.selectexpr(["sum(case when Name=Name_of_desired_column then Value else null) as Name_of_desired_column"])