是否可以在Kafka+Spark Streaming中获取特定的消息偏移量?
Is it possible to obtain specific message offset in Kafka+SparkStreaming?
我正在尝试使用 Spark Direct Stream 在 Kafka 中获取和存储特定消息的偏移量。
查看 Spark 文档很容易获得每个分区的范围偏移量,但我需要的是在完整扫描队列后存储主题的每条消息的起始偏移量。
是的,您可以使用 createDirectStream
的 MessageAndMetadata 版本,它允许您访问 message metadata
。
您可以在此处找到 tuple3
的 returns Dstream 示例。
val ssc = new StreamingContext(sparkConf, Seconds(10))
val kafkaParams = Map[String, String]("metadata.broker.list" -> (kafkaBroker))
var fromOffsets = Map[TopicAndPartition, Long]()
val topicAndPartition: TopicAndPartition = new TopicAndPartition(kafkaTopic.trim, 0)
val topicAndPartition1: TopicAndPartition = new TopicAndPartition(kafkaTopic1.trim, 0)
fromOffsets += (topicAndPartition -> inputOffset)
fromOffsets += (topicAndPartition1 -> inputOffset1)
val messagesDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple3[String, Long, String]](ssc, kafkaParams, fromOffsets, (mmd: MessageAndMetadata[String, String]) => {
(mmd.topic ,mmd.offset, mmd.message().toString)
})
在上面的示例中,tuple3._1
将具有 topic
,tuple3._2
将具有 offset
,而 tuple3._3
将具有 message
。
希望对您有所帮助!
我正在尝试使用 Spark Direct Stream 在 Kafka 中获取和存储特定消息的偏移量。 查看 Spark 文档很容易获得每个分区的范围偏移量,但我需要的是在完整扫描队列后存储主题的每条消息的起始偏移量。
是的,您可以使用 createDirectStream
的 MessageAndMetadata 版本,它允许您访问 message metadata
。
您可以在此处找到 tuple3
的 returns Dstream 示例。
val ssc = new StreamingContext(sparkConf, Seconds(10))
val kafkaParams = Map[String, String]("metadata.broker.list" -> (kafkaBroker))
var fromOffsets = Map[TopicAndPartition, Long]()
val topicAndPartition: TopicAndPartition = new TopicAndPartition(kafkaTopic.trim, 0)
val topicAndPartition1: TopicAndPartition = new TopicAndPartition(kafkaTopic1.trim, 0)
fromOffsets += (topicAndPartition -> inputOffset)
fromOffsets += (topicAndPartition1 -> inputOffset1)
val messagesDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple3[String, Long, String]](ssc, kafkaParams, fromOffsets, (mmd: MessageAndMetadata[String, String]) => {
(mmd.topic ,mmd.offset, mmd.message().toString)
})
在上面的示例中,tuple3._1
将具有 topic
,tuple3._2
将具有 offset
,而 tuple3._3
将具有 message
。
希望对您有所帮助!