Spark Streaming GroupBy 元组的一部分进行处理
Spark Streaming GroupBy Parts of a Tuple to Process
我是 运行 一个来自 Kafka 的火花流工作 运行。我收到这样的消息:
val messageStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, Int, Long, String)](ssc, getKafkaBrokers(), getKafkaTopics("raw"), (mmd: MessageAndMetadata[String, String]) => {
(mmd.topic, mmd.partition, mmd.offset, mmd.message)
})
现在,当我引入数据时,我想按主题和分区分组,这样所有具有相同 topic/partition 的东西我都可以在一批中处理。在这里使用什么功能是正确的
messageStream.foreachRDD(x => x.?
是groupBy吗?如果它是一个 groupBy 如何按我所拥有的元组的前 2 部分进行分组。KafkaRDD[0] 中会有很多消息,所以我想将它们分组到一组类似的消息中,以便能够处理每个消息分组为一个块与单个消息。
编辑:
所以根据下面的反馈,我会有这样的东西:
messageStream.foreachRDD(x => x.groupBy(x => (x._1, x._2)).foreach(x => {
?
}))
现在是不是像K是(topic, partition), value是(offset, topic)这样的一个K,V?
我需要元组的第一部分和第二部分,因为这将允许我进行 API 调用以获取有关如何处理消息的说明。我不想做的是在每条消息上单独调用 API,因为很多消息都具有基于 topic/partition.
的相同指令集
编辑:
意识到它现在是:
K:(Topic, Partition) V: CompactBuffer((Topic, Partition, Offset, Message), ()) 等
messageStream.foreachRDD(x => x.groupBy(x => (x._1, x._2)).foreach(x => {
val topic = x._1_.1
val partition = x._1._2
x._2.forEach(x=> ...
}))
对元组中的前两部分进行groupBy,可以尝试如下:
messageStream groupBy (x => (x._1, x._2))
我是 运行 一个来自 Kafka 的火花流工作 运行。我收到这样的消息:
val messageStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, Int, Long, String)](ssc, getKafkaBrokers(), getKafkaTopics("raw"), (mmd: MessageAndMetadata[String, String]) => {
(mmd.topic, mmd.partition, mmd.offset, mmd.message)
})
现在,当我引入数据时,我想按主题和分区分组,这样所有具有相同 topic/partition 的东西我都可以在一批中处理。在这里使用什么功能是正确的
messageStream.foreachRDD(x => x.?
是groupBy吗?如果它是一个 groupBy 如何按我所拥有的元组的前 2 部分进行分组。KafkaRDD[0] 中会有很多消息,所以我想将它们分组到一组类似的消息中,以便能够处理每个消息分组为一个块与单个消息。
编辑: 所以根据下面的反馈,我会有这样的东西:
messageStream.foreachRDD(x => x.groupBy(x => (x._1, x._2)).foreach(x => {
?
}))
现在是不是像K是(topic, partition), value是(offset, topic)这样的一个K,V? 我需要元组的第一部分和第二部分,因为这将允许我进行 API 调用以获取有关如何处理消息的说明。我不想做的是在每条消息上单独调用 API,因为很多消息都具有基于 topic/partition.
的相同指令集编辑: 意识到它现在是:
K:(Topic, Partition) V: CompactBuffer((Topic, Partition, Offset, Message), ()) 等
messageStream.foreachRDD(x => x.groupBy(x => (x._1, x._2)).foreach(x => {
val topic = x._1_.1
val partition = x._1._2
x._2.forEach(x=> ...
}))
对元组中的前两部分进行groupBy,可以尝试如下:
messageStream groupBy (x => (x._1, x._2))