如何在读取来自 Kafka 的消息流时处理 Avro 消息?
How to process Avro messages while reading a stream of messages from Kafka?
下面的代码从 Kafka 读取消息并且消息在 Avro 中,那么我如何解析消息并将其放入 Spark 2.2.0 中的数据帧中?
Dataset<Row> df = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.load();
这个 https://github.com/databricks/spark-avro 库没有流案例的例子。
how do I parse the message and put it into a dataframe in Spark 2.2.0?
这是你的家庭练习,需要一些编码。
This https://github.com/databricks/spark-avro library had no example for streaming case.
有人告诉我(并在这里看到了几个问题)spark-avro 不 支持 Spark Structured Streaming(又名 Spark Streams)。它适用于非流式数据集,但不能处理流式数据集。
这就是为什么我写道这是你必须自己编写代码的原因。
可能如下所示(为简单起见,我使用 Scala):
// Step 1. convert messages to be strings
val avroMessages = df.select($"value" cast "string")
// Step 2. Strip the avro layer off
val from_avro = udf { (s: String) => ...processing here... }
val cleanDataset = avroMessages.withColumn("no_avro_anymore", from_avro($"value"))
这将需要开发一个 from_avro
自定义 UDF 来执行您想要的操作(并且类似于 Spark 使用 from_json
标准函数处理 JSON 格式的方式!)
或者(稍微更高级一些?/复杂的方法)为 Kafka 中 Avro 格式的数据集编写您自己的自定义流 Source 并改用它。
Dataset<Row> df = sparkSession.readStream()
.format("avro-kafka") // <-- HERE YOUR CUSTOM Source
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.load();
我还没有发现 avro-kafka
格式的可行性。它确实可行,但同时做两件事,即从 Kafka 读取 和 进行 Avro 转换,我不相信这是在 Spark Structured Streaming 和软件工程中做事的方式一般的。我希望有一种方法可以一个接一个地应用一种格式,但这在 Spark 2.2.1 中是不可能的(并且也不计划用于 2.3)。
那么我认为UDF是目前最好的解决方案。
想一想,您还可以编写自定义 Kafka Deserializer,在 Spark 加载消息时进行反序列化。
下面的代码从 Kafka 读取消息并且消息在 Avro 中,那么我如何解析消息并将其放入 Spark 2.2.0 中的数据帧中?
Dataset<Row> df = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.load();
这个 https://github.com/databricks/spark-avro 库没有流案例的例子。
how do I parse the message and put it into a dataframe in Spark 2.2.0?
这是你的家庭练习,需要一些编码。
This https://github.com/databricks/spark-avro library had no example for streaming case.
有人告诉我(并在这里看到了几个问题)spark-avro 不 支持 Spark Structured Streaming(又名 Spark Streams)。它适用于非流式数据集,但不能处理流式数据集。
这就是为什么我写道这是你必须自己编写代码的原因。
可能如下所示(为简单起见,我使用 Scala):
// Step 1. convert messages to be strings
val avroMessages = df.select($"value" cast "string")
// Step 2. Strip the avro layer off
val from_avro = udf { (s: String) => ...processing here... }
val cleanDataset = avroMessages.withColumn("no_avro_anymore", from_avro($"value"))
这将需要开发一个 from_avro
自定义 UDF 来执行您想要的操作(并且类似于 Spark 使用 from_json
标准函数处理 JSON 格式的方式!)
或者(稍微更高级一些?/复杂的方法)为 Kafka 中 Avro 格式的数据集编写您自己的自定义流 Source 并改用它。
Dataset<Row> df = sparkSession.readStream()
.format("avro-kafka") // <-- HERE YOUR CUSTOM Source
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.load();
我还没有发现 avro-kafka
格式的可行性。它确实可行,但同时做两件事,即从 Kafka 读取 和 进行 Avro 转换,我不相信这是在 Spark Structured Streaming 和软件工程中做事的方式一般的。我希望有一种方法可以一个接一个地应用一种格式,但这在 Spark 2.2.1 中是不可能的(并且也不计划用于 2.3)。
那么我认为UDF是目前最好的解决方案。
想一想,您还可以编写自定义 Kafka Deserializer,在 Spark 加载消息时进行反序列化。