使用 Spark Structured Streaming 将数据写入 JSON 数组

Writing data as JSON array with Spark Structured Streaming

我必须将来自 Spark Structure 流的数据写入 JSON 数组,我尝试使用以下代码:

df.selectExpr("to_json(struct(*)) AS value").toJSON

其中 returns 我的 DataSet[String],但无法写成 JSON 数组。

当前输出:

{"name":"test","id":"id"}
{"name":"test1","id":"id1"}

预期输出:

[{"name":"test","id":"id"},{"name":"test1","id":"id1"}]

编辑(将评论移至问题中):

使用建议的 collect_list 方法后,我得到

Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;

然后我尝试了这样的事情-

withColumn("timestamp", unix_timestamp(col("event_epoch"), "MM/dd/yyyy hh:mm:ss aa")) .withWatermark("event_epoch", "1 minutes") .groupBy(col("event_epoch")) .agg(max(col("event_epoch")).alias("timestamp")) 

但我不想添加新列。

您可以为此使用 SQL 内置函数 collect_list。此函数收集并 returns 一组非唯一元素(与 collect_set 相比,后者 returns 仅收集唯一元素)。

collect_list you will see that this is an aggregation function. Based on the requirements given in the Structured Streaming Programming Guide on Output Modes 的源代码中突出显示,没有水印的聚合支持输出模式“完整”和“更新”。

据我了解,您不希望添加水印和新栏目。另外,您遇到的错误

Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark; 

提醒您不​​要使用“追加”输出模式。

在评论中,您提到您计划将结果生成到 Kafka 消息中。一个大 JSON 数组作为一个 Kafka 值。完整的代码看起来像

val df = spark.readStream
  .[...] // in my test I am reading from Kafka source
  .load()
  .selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value", "offset", "partition")
  // do not forget to convert you data into a String before writing to Kafka
  .selectExpr("CAST(collect_list(to_json(struct(*))) AS STRING) AS value")

df.writeStream
  .format("kafka")
  .outputMode("complete")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "test")
  .option("checkpointLocation", "/path/to/sparkCheckpoint")
  .trigger(Trigger.ProcessingTime(10000))
  .start()
  .awaitTermination()

给定 key/value 对 (k1,v1)、(k2,v2) 和 (k3,v3) 作为输入,您将在 Kafka 主题中获得一个包含所有选定数据的值作为 JSON数组:

[{"key":"k1","value":"v1","offset":7,"partition":0}, {"key":"k2","value":"v2","offset":8,"partition":0}, {"key":"k3","value":"v3","offset":9,"partition":0}]

使用 Spark 3.0.1 和 Kafka 2.5.0 进行测试。