将数据帧从一种格式转换为另一种格式

converting dataframe from one format to another

我正在创建一个 spark 结构化流式应用程序,我有一个流式数据框,其中包含以下数据。

{
"name":"sensor1",
"time":"2020-11-27T01:01:00",
"sensorvalue":11.0,
"tag1":"tagvalue"
}

我想将该数据框转换为以下格式。

{
"name":"sensor1",
"value-array":
[
{
"time":"2020-11-27T01:01:00",
"sensorvalue":11.0,
"tag1":"tagvalue"
}
]
}

我尝试使用 mapPartition() / map() 方法,在其中我得到一个行对象,并尝试以预期的格式创建另一个数据框。

我可以使用 row(0) 或 row(1) 从行对象中获取值。但是,是否可以为该行对象放置一个 POJO/schema (使用行编码器())?这样我们就可以使用 row.getName() ?

而不是使用 row(0)
val mapDF = incomingDF.map(row =>{
    val name = row(0).toString
    /* val name = row.getName */

  })

我尝试使用 collect_list() 函数。由于它是一个聚合函数,我无法在我的流应用程序中使用“追加”作为输出模式。

使用 arraystruct 函数。

检查下面的代码。

scala> df
.withColumn("value-array",array(struct($"time",$"sensorvalue",$"tag1")))
.select(to_json(struct($"name",$"value-array")).as("json_data"))

以上代码将为您提供如下输出。

+-----------------------------------------------------------------------------------------------------------+
|json_data                                                                                                  |
+-----------------------------------------------------------------------------------------------------------+
|{"name":"sensor1","value-array":[{"time":"2020-11-27T01:01:00.000Z","sensorvalue":11.0,"tag1":"tagvalue"}]}|
+-----------------------------------------------------------------------------------------------------------+