如何在 writeStream 到 Elasticsearch 之前将 JSON 数组转换为行?
How to transform array of JSONs to rows before writeStream to Elasticsearch?
跟进
我有JSON流数据,格式与下面相同
| A | B |
|-------|------------------------------------------|
| ABC | [{C:1, D:1}, {C:2, D:4}] |
| XYZ | [{C:3, D :6}, {C:9, D:11}, {C:5, D:12}] |
我需要将它转换成下面的格式
| A | C | D |
|-------|-----|------|
| ABC | 1 | 1 |
| ABC | 2 | 4 |
| XYZ | 3 | 6 |
| XYZ | 9 | 11 |
| XYZ | 5 | 12 |
为了实现这一点,按照上一个问题的建议执行转换。
val df1 = df0.select($"A", explode($"B")).toDF("A", "Bn")
val df2 = df1.withColumn("SeqNum", monotonically_increasing_id()).toDF("A", "Bn", "SeqNum")
val df3 = df2.select($"A", explode($"Bn"), $"SeqNum").toDF("A", "B", "C", "SeqNum")
val df4 = df3.withColumn("dummy", concat( $"SeqNum", lit("||"), $"A"))
val df5 = df4.select($"dummy", $"B", $"C").groupBy("dummy").pivot("B").agg(first($"C"))
val df6 = df5.withColumn("A", substring_index(col("dummy"), "||", -1)).drop("dummy")
现在我需要将数据保存到 ElasticSearch。
df6.writeStream
.outputMode("complete")
.format("es")
.option("es.resource", "index/type")
.option("es.nodes", "localhost")
.option("es.port", 9200)
.start()
.awaitTermination()
我收到 ElasticSearch 不支持 Append
输出模式的错误。在 Append
模式下写入 writeStream
失败,无法在 Append
模式下完成聚合。我能够在完整模式下写入控制台。现在如何将数据写入ElasticSearch
任何帮助将不胜感激。
这里不需要pivot
或者聚合。如果 B
列确实是 Array[Map[String, String]]
(array<map<string, string>>
in SQL 类型),您只需要一个简单的 select
或 withColumn
:
df
.withColumn("B", explode($"B"))
.select($"A", $"B"("C") as "C", $"B"("D") as "D")
跟进
我有JSON流数据,格式与下面相同
| A | B |
|-------|------------------------------------------|
| ABC | [{C:1, D:1}, {C:2, D:4}] |
| XYZ | [{C:3, D :6}, {C:9, D:11}, {C:5, D:12}] |
我需要将它转换成下面的格式
| A | C | D |
|-------|-----|------|
| ABC | 1 | 1 |
| ABC | 2 | 4 |
| XYZ | 3 | 6 |
| XYZ | 9 | 11 |
| XYZ | 5 | 12 |
为了实现这一点,按照上一个问题的建议执行转换。
val df1 = df0.select($"A", explode($"B")).toDF("A", "Bn")
val df2 = df1.withColumn("SeqNum", monotonically_increasing_id()).toDF("A", "Bn", "SeqNum")
val df3 = df2.select($"A", explode($"Bn"), $"SeqNum").toDF("A", "B", "C", "SeqNum")
val df4 = df3.withColumn("dummy", concat( $"SeqNum", lit("||"), $"A"))
val df5 = df4.select($"dummy", $"B", $"C").groupBy("dummy").pivot("B").agg(first($"C"))
val df6 = df5.withColumn("A", substring_index(col("dummy"), "||", -1)).drop("dummy")
现在我需要将数据保存到 ElasticSearch。
df6.writeStream
.outputMode("complete")
.format("es")
.option("es.resource", "index/type")
.option("es.nodes", "localhost")
.option("es.port", 9200)
.start()
.awaitTermination()
我收到 ElasticSearch 不支持 Append
输出模式的错误。在 Append
模式下写入 writeStream
失败,无法在 Append
模式下完成聚合。我能够在完整模式下写入控制台。现在如何将数据写入ElasticSearch
任何帮助将不胜感激。
这里不需要pivot
或者聚合。如果 B
列确实是 Array[Map[String, String]]
(array<map<string, string>>
in SQL 类型),您只需要一个简单的 select
或 withColumn
:
df
.withColumn("B", explode($"B"))
.select($"A", $"B"("C") as "C", $"B"("D") as "D")