如何将 DataSet<Row> 转换为 JSON 消息的 DataSet 以写入 Kafka?
How to Convert DataSet<Row> to DataSet of JSON messages to write to Kafka?
我用的是 Spark 2.1.1.
我有以下 DataSet<Row>
ds1;
name | ratio | count // column names
"hello" | 1.56 | 34
(ds1.isStreaming
给出 true
)
我正在尝试生成 DataSet<String>
ds2。换句话说,当我写到 kafka 接收器时,我想写这样的东西
{"name": "hello", "ratio": 1.56, "count": 34}
我试过类似这样的东西 df2.toJSON().writeStream().foreach(new KafkaSink()).start()
但是它给出了以下错误
Queries with streaming sources must be executed with writeStream.start()
有 to_json
和 json_tuple
但是我不确定如何在这里利用它们?
我使用 json_tuple()
函数
尝试了以下操作
Dataset<String> df4 = df3.select(json_tuple(new Column("result"), " name", "ratio", "count")).as(Encoders.STRING());
我收到以下错误:
cannot resolve 'result
' given input columns: [name, ratio, count];;
tl;dr 使用 struct
函数后跟 to_json
(因为 toJSON
由于 SPARK-17029 that got fixed just 20 days ago).
引用 struct 的 scaladoc:
struct(colName: String, colNames: String*): Column Creates a new struct column that composes multiple input columns.
假设您使用 Java API,您也有 4 个不同的 struct 函数变体:
public static Column struct(Column... cols) Creates a new struct column.
使用 to_json 功能,您的情况已涵盖:
public static Column to_json(Column e) Converts a column containing a StructType into a JSON string with the specified schema.
以下是 Scala 代码(将其翻译成 Java 是您的家庭练习):
val ds1 = Seq(("hello", 1.56, 34)).toDF("name", "ratio", "count")
val recordCol = to_json(struct("name", "ratio", "count")) as "record"
scala> ds1.select(recordCol).show(truncate = false)
+----------------------------------------+
|record |
+----------------------------------------+
|{"name":"hello","ratio":1.56,"count":34}|
+----------------------------------------+
我也尝试了您的解决方案(使用今天构建的 Spark 2.3.0-SNAPSHOT),它似乎运行良好。
val fromKafka = spark.
readStream.
format("kafka").
option("subscribe", "topic1").
option("kafka.bootstrap.servers", "localhost:9092").
load.
select('value cast "string")
fromKafka.
toJSON. // <-- JSON conversion
writeStream.
format("console"). // using console sink
start
format("kafka")
已添加到 SPARK-19719 中,但在 2.1.0 中不可用。
我用的是 Spark 2.1.1.
我有以下 DataSet<Row>
ds1;
name | ratio | count // column names
"hello" | 1.56 | 34
(ds1.isStreaming
给出 true
)
我正在尝试生成 DataSet<String>
ds2。换句话说,当我写到 kafka 接收器时,我想写这样的东西
{"name": "hello", "ratio": 1.56, "count": 34}
我试过类似这样的东西 df2.toJSON().writeStream().foreach(new KafkaSink()).start()
但是它给出了以下错误
Queries with streaming sources must be executed with writeStream.start()
有 to_json
和 json_tuple
但是我不确定如何在这里利用它们?
我使用 json_tuple()
函数
Dataset<String> df4 = df3.select(json_tuple(new Column("result"), " name", "ratio", "count")).as(Encoders.STRING());
我收到以下错误:
cannot resolve '
result
' given input columns: [name, ratio, count];;
tl;dr 使用 struct
函数后跟 to_json
(因为 toJSON
由于 SPARK-17029 that got fixed just 20 days ago).
引用 struct 的 scaladoc:
struct(colName: String, colNames: String*): Column Creates a new struct column that composes multiple input columns.
假设您使用 Java API,您也有 4 个不同的 struct 函数变体:
public static Column struct(Column... cols) Creates a new struct column.
使用 to_json 功能,您的情况已涵盖:
public static Column to_json(Column e) Converts a column containing a StructType into a JSON string with the specified schema.
以下是 Scala 代码(将其翻译成 Java 是您的家庭练习):
val ds1 = Seq(("hello", 1.56, 34)).toDF("name", "ratio", "count")
val recordCol = to_json(struct("name", "ratio", "count")) as "record"
scala> ds1.select(recordCol).show(truncate = false)
+----------------------------------------+
|record |
+----------------------------------------+
|{"name":"hello","ratio":1.56,"count":34}|
+----------------------------------------+
我也尝试了您的解决方案(使用今天构建的 Spark 2.3.0-SNAPSHOT),它似乎运行良好。
val fromKafka = spark.
readStream.
format("kafka").
option("subscribe", "topic1").
option("kafka.bootstrap.servers", "localhost:9092").
load.
select('value cast "string")
fromKafka.
toJSON. // <-- JSON conversion
writeStream.
format("console"). // using console sink
start
format("kafka")
已添加到 SPARK-19719 中,但在 2.1.0 中不可用。