使用 kafka 方法和 spark streaming 从 kafka 消费给出不同的结果
Consuming from kafka using kafka methods and spark streaming gives different result
我正在尝试使用 Spark Streaming 从 Kafka 中获取一些数据。
我创造了 2 个工作,
- 一个简单的 kafka 作业,使用:
consumeFirstStringMessageFrom(topic)
给出主题期望值。
{
"data": {
"type": "SA_LIST",
"login": "username@mycompany.com",
"updateDate": "2020-09-09T14:58:39.775Z",
"content": [
{
"sku": "800633955",
"status": "ACTIVE",
"quantity": 1
}
],
"saCode": "E40056",
"clientId": "30179801688090",
"$setOnInsert": {
"__v": 0
}
},
"operation": "UPDATE",
"type": "List"
}
- Spark 流作业:
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaConfig.broker)
.option("subscribe", kafkaConfig.topic)
.option("startingOffsets", kafkaConfig.startingOffsets)
.load()
df.writeStream
.outputMode(OutputMode.Append())
.format("console")
.option("truncate", false)
.trigger(Trigger.ProcessingTime("2 seconds"))
.start().awaitTermination()
显示如下结果
{
"key": "I4NTY4NV9MSVNUX1dJU0hMSVNUIg==",
"value": "eyJkYXRhIjp7InR5cGUiOiJXSVNITElTVCIsImxvZ2luIjoiZHJlYW1lcjJAeW9wbWFpbC5jb20iLCJ1cGRhdGVEYXRZSI6Ikxpc3QifQ==",
"topic": "PLP_GLOBAL_QA",
"partition": 0,
"offset": 1826,
"timestamp": "2020-09-10T16:09:08.606Z",
"timestampType": 0
}
它似乎显示了主题信息(键、值、主题、分区、偏移量,...)我错过了什么吗?
如果需要,我可以添加更多信息。
Spark Streaming 作业以序列化形式向您显示数据,而您的 Kafka 消费者已经 de-serialized 它。
根据 Spark Structured Kafka integration guide,您不仅获得了 Kafka 消息的键和值,还获得了其他(元)信息。这是您从 Kafka 获得的每条消息的架构:
Column Type
key binary
value binary
topic string
partition int
offset long
timestamp timestamp
timestampType int
如果您只想 select 键和值,或者甚至只是 select 的值,您可以将它们转换为人类可读的字符串:
[...]
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
我正在尝试使用 Spark Streaming 从 Kafka 中获取一些数据。
我创造了 2 个工作,
- 一个简单的 kafka 作业,使用:
consumeFirstStringMessageFrom(topic)
给出主题期望值。
{
"data": {
"type": "SA_LIST",
"login": "username@mycompany.com",
"updateDate": "2020-09-09T14:58:39.775Z",
"content": [
{
"sku": "800633955",
"status": "ACTIVE",
"quantity": 1
}
],
"saCode": "E40056",
"clientId": "30179801688090",
"$setOnInsert": {
"__v": 0
}
},
"operation": "UPDATE",
"type": "List"
}
- Spark 流作业:
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaConfig.broker)
.option("subscribe", kafkaConfig.topic)
.option("startingOffsets", kafkaConfig.startingOffsets)
.load()
df.writeStream
.outputMode(OutputMode.Append())
.format("console")
.option("truncate", false)
.trigger(Trigger.ProcessingTime("2 seconds"))
.start().awaitTermination()
显示如下结果
{
"key": "I4NTY4NV9MSVNUX1dJU0hMSVNUIg==",
"value": "eyJkYXRhIjp7InR5cGUiOiJXSVNITElTVCIsImxvZ2luIjoiZHJlYW1lcjJAeW9wbWFpbC5jb20iLCJ1cGRhdGVEYXRZSI6Ikxpc3QifQ==",
"topic": "PLP_GLOBAL_QA",
"partition": 0,
"offset": 1826,
"timestamp": "2020-09-10T16:09:08.606Z",
"timestampType": 0
}
它似乎显示了主题信息(键、值、主题、分区、偏移量,...)我错过了什么吗?
如果需要,我可以添加更多信息。
Spark Streaming 作业以序列化形式向您显示数据,而您的 Kafka 消费者已经 de-serialized 它。
根据 Spark Structured Kafka integration guide,您不仅获得了 Kafka 消息的键和值,还获得了其他(元)信息。这是您从 Kafka 获得的每条消息的架构:
Column Type
key binary
value binary
topic string
partition int
offset long
timestamp timestamp
timestampType int
如果您只想 select 键和值,或者甚至只是 select 的值,您可以将它们转换为人类可读的字符串:
[...]
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]