Kafka Scala:如何从 OffsetAndMetadata 获取偏移值 class
Kafka Scala: How to get offset value from OffsetAndMetadata class
我在 offsets
上循环三次:
- 获取
TopicPartition
&offset
- 获取
TopicPartition
& OffsetAndMetadata
- 获取生产者与消费者之间的增量
我想知道是否可以从 OffsetAndMetadata
中获取 offset
值,但我不确定如何获取。我无法在网上找到示例来获取此值。感谢任何帮助,谢谢!
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
event.progress.sources
// Ignoring sqs / jms sources their offsets
.filter(source => source.description.toLowerCase().contains("kafka"))
// ex offset :
// "endOffset" : {
// "rcs-enriched-event" : {
// "8" : 31376,
// "11" : 39114,
// "2" : 39376,
// } ...
.foreach(source => {
/// Map[Topic,Map[Partition, CurrentOffset]]
val jsonOffsets = objectMapper.readValue(source.endOffset, classOf[Map[String, Map[String, Int]]])
jsonOffsets.keys.filter(key => topics.contains(key))
.foreach(topic => {
val offsets: Map[String, Int] = jsonOffsets(topic)
val consumedPartitions = new ListBuffer[TopicPartition]()
val topicOffsetList = new ListBuffer[Int]()
val mapTopicPartitionOffset = offsets
.keys
.map(partition => {
val tp = new TopicPartition(topic, partition.toInt)
val offset = offsets(partition).toLong
(tp -> offset)
})
.toMap
val mapTopicPartition = offsets
.keys
.map(partition => {
val tp = new TopicPartition(topic, partition.toInt)
val oam = new OffsetAndMetadata(offsets(partition).toLong)
(tp -> oam)
})
.toMap
for(topicPartition <- mapTopicPartitionOffset){
consumedPartitions += topicPartition
}
try {
val kafkaPartitionOffset = kafkaConsumer.endOffsets(consumedPartitions.asJava)
for((topicPartition,offset) <- mapTopicPartitionOffset){
val bbCurrentOffset = offset
// latest offset
val partitionLatestOffset = kafkaPartitionOffset.get(topicPartition)
// Partition offset delta
val delta = partitionLatestOffset - bbCurrentOffset
topicOffsetList += delta.abs
}
} catch {
case e: Exception => {
log.error(s"${consumerGroupId} Could not get Kafka offset", e)
}
}
try {
kafkaConsumer.commitSync(mapTopicPartition.asJava)
} catch {
case e: Exception => log.error(s"${consumerGroupId} Could not commit offset", e)
}
//log.info have the group id (unique id), the topic, cumulative consumer lag delta
log.info("consumerGroupId: " + consumerGroupId + " topic: " + topic + " lagDeltaSum: " + topicOffsetList.sum)
})
})
}
如果没有主题和分区,您将无法获得偏移值。您已经针对每个主题进行迭代,并且您的 JSON 表明您在分区映射中有键,所以这个问题只是“如何从映射中获取值?”
更重要的是,当你在其他两个地方有相同的值时,你真的需要从OffsetAndMetadata
获得偏移量吗?
更具体地说,您不需要 mapTopicPartitionOffset
和 mapTopicPartition
。每个都包含完全相同的信息,一个只有一个对象作为具有空元数据 属性.
的值
您还有 offsets
映射,其值是您正在迭代的当前主题的所有偏移量,键是它的(消耗的)分区。
每个都包含您想要的偏移值
你只需要 mapTopicPartition
就可以与 commitSync
一起使用,所以试试这个
.foreach(topic => {
val offsets: Map[String, Int] = jsonOffsets(topic)
// for getting end offsets
val consumedPartitions = new ListBuffer[TopicPartition]()
// convert to values accepted by Kafka Consumer API
val toCommit = offsets
.keys
.map(partition => {
val tp = new TopicPartition(topic, partition.toInt)
consumedPartitions += tp
val oam = new OffsetAndMetadata(offsets.get(partition).toLong)
(tp -> oam)
})
.toMap
// could also use toCommit.keys instead of separate list
val kafkaPartitionOffset = kafkaConsumer.endOffsets(consumedPartitions.asJava)
for((topicPartition,oam) <- toCommit) {
val bbCurrentOffset = oam.offset()
// or offsets.get(String.valueOf(topicPartition.partition))
我在 offsets
上循环三次:
- 获取
TopicPartition
&offset
- 获取
TopicPartition
&OffsetAndMetadata
- 获取生产者与消费者之间的增量
我想知道是否可以从 OffsetAndMetadata
中获取 offset
值,但我不确定如何获取。我无法在网上找到示例来获取此值。感谢任何帮助,谢谢!
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
event.progress.sources
// Ignoring sqs / jms sources their offsets
.filter(source => source.description.toLowerCase().contains("kafka"))
// ex offset :
// "endOffset" : {
// "rcs-enriched-event" : {
// "8" : 31376,
// "11" : 39114,
// "2" : 39376,
// } ...
.foreach(source => {
/// Map[Topic,Map[Partition, CurrentOffset]]
val jsonOffsets = objectMapper.readValue(source.endOffset, classOf[Map[String, Map[String, Int]]])
jsonOffsets.keys.filter(key => topics.contains(key))
.foreach(topic => {
val offsets: Map[String, Int] = jsonOffsets(topic)
val consumedPartitions = new ListBuffer[TopicPartition]()
val topicOffsetList = new ListBuffer[Int]()
val mapTopicPartitionOffset = offsets
.keys
.map(partition => {
val tp = new TopicPartition(topic, partition.toInt)
val offset = offsets(partition).toLong
(tp -> offset)
})
.toMap
val mapTopicPartition = offsets
.keys
.map(partition => {
val tp = new TopicPartition(topic, partition.toInt)
val oam = new OffsetAndMetadata(offsets(partition).toLong)
(tp -> oam)
})
.toMap
for(topicPartition <- mapTopicPartitionOffset){
consumedPartitions += topicPartition
}
try {
val kafkaPartitionOffset = kafkaConsumer.endOffsets(consumedPartitions.asJava)
for((topicPartition,offset) <- mapTopicPartitionOffset){
val bbCurrentOffset = offset
// latest offset
val partitionLatestOffset = kafkaPartitionOffset.get(topicPartition)
// Partition offset delta
val delta = partitionLatestOffset - bbCurrentOffset
topicOffsetList += delta.abs
}
} catch {
case e: Exception => {
log.error(s"${consumerGroupId} Could not get Kafka offset", e)
}
}
try {
kafkaConsumer.commitSync(mapTopicPartition.asJava)
} catch {
case e: Exception => log.error(s"${consumerGroupId} Could not commit offset", e)
}
//log.info have the group id (unique id), the topic, cumulative consumer lag delta
log.info("consumerGroupId: " + consumerGroupId + " topic: " + topic + " lagDeltaSum: " + topicOffsetList.sum)
})
})
}
如果没有主题和分区,您将无法获得偏移值。您已经针对每个主题进行迭代,并且您的 JSON 表明您在分区映射中有键,所以这个问题只是“如何从映射中获取值?”
更重要的是,当你在其他两个地方有相同的值时,你真的需要从OffsetAndMetadata
获得偏移量吗?
更具体地说,您不需要 mapTopicPartitionOffset
和 mapTopicPartition
。每个都包含完全相同的信息,一个只有一个对象作为具有空元数据 属性.
您还有 offsets
映射,其值是您正在迭代的当前主题的所有偏移量,键是它的(消耗的)分区。
每个都包含您想要的偏移值
你只需要 mapTopicPartition
就可以与 commitSync
一起使用,所以试试这个
.foreach(topic => {
val offsets: Map[String, Int] = jsonOffsets(topic)
// for getting end offsets
val consumedPartitions = new ListBuffer[TopicPartition]()
// convert to values accepted by Kafka Consumer API
val toCommit = offsets
.keys
.map(partition => {
val tp = new TopicPartition(topic, partition.toInt)
consumedPartitions += tp
val oam = new OffsetAndMetadata(offsets.get(partition).toLong)
(tp -> oam)
})
.toMap
// could also use toCommit.keys instead of separate list
val kafkaPartitionOffset = kafkaConsumer.endOffsets(consumedPartitions.asJava)
for((topicPartition,oam) <- toCommit) {
val bbCurrentOffset = oam.offset()
// or offsets.get(String.valueOf(topicPartition.partition))