Scala Kafka Spark:获取所有kafkaConsumer endOffSets并将其分配给一个val
Scala Kafka Spark: Get all kafkaConsumer endOffSets and assign it to a val
下面的代码效率低下,它每次都在 for 循环中请求 kafkaConsumer
(它表示 <!-- move code below -->
)。如何将其移动到 <!-- move it here -->
,以便每个主题只需要请求一次?我相信我必须从 jsonOffsets
中获取所有 TopicPartition
并将其放入 kafkaConsumer.endOffsets
但我不确定该怎么做。
endOffsets
接受类型 TopicPartitions
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
event.progress.sources
// Ignoring sqs / jms sources their offsets
.filter(source => source.description.toLowerCase().contains("kafka"))
// ex offset :
// "endOffset" : {
// "rcs-enriched-event" : {
// "8" : 31376,
// "11" : 39114,
// } ...
.foreach(source => {
/// Map[Topic,Map[Partition, CurrentOffset]]
val jsonOffsets = objectMapper.readValue(source.endOffset, classOf[Map[String, Map[String, Int]]])
jsonOffsets.keys.filter(key => topics.contains(key))
.foreach(topic => {
val topicPartitionMap = new java.util.HashMap[TopicPartition, OffsetAndMetadata]()
// Map[Partition, CurrentOffset]
val topicOffsetList = new ListBuffer[Int]()
val offsets: Option[Map[String, Int]] = jsonOffsets.get(topic)
<!-- move it here -->
offsets match {
case Some(topicOffsetData) =>
topicOffsetData.keys.foreach(partition => {
/// "4" : 34937
val tp = new TopicPartition(topic, partition.toInt)
val oam = new OffsetAndMetadata(topicOffsetData(partition).toLong)
val bbCurrentOffset = topicOffsetData(partition).toLong
<!-- move code below -->
val kafkaPartitionOffset = kafkaConsumer.endOffsets(java.util.Arrays.asList(tp))
// latest offset
val partitionLatestOffset = kafkaPartitionOffset.get(tp)
// Log for a particular partition
val delta = partitionLatestOffset - bbCurrentOffset
topicOffsetList += delta.abs
topicPartitionMap.put(tp, oam)
})
}
try {
kafkaConsumer.commitSync(topicPartitionMap)
} catch {
case e: Exception => log.error(s"${groupId} Could not commit offset", e)
}
//log.info have the group id (unique id), the topic, cumulative consumer lag delta
//push out to logger
log.info("groupId: " + groupId + " topic: " + topic + " lagDeltaSum: " + topicOffsetList.sum)
})
})
}
在我看来你应该循环 offsets
两次。
.foreach(topic => {
val offsets: Option[Map[String, Int]] = jsonOffsets.get(topic)
// Fetch latest offsets
val consumedPartitions = new ListBuffer[TopicPartition]()
offsets match {
case Some(topicOffsetData) =>
topicOffsetData.keys.foreach(partition => {
val tp = new TopicPartition(topic, partition.toInt)
consumedPartitions += tp
}
}
val latestOffsets = kafkaConsumer.endOffsets(consumedPartitions.asJava)
offsets match {
case Some(topicOffsetData) =>
// Use latest offsets, as needed ...
下面的代码效率低下,它每次都在 for 循环中请求 kafkaConsumer
(它表示 <!-- move code below -->
)。如何将其移动到 <!-- move it here -->
,以便每个主题只需要请求一次?我相信我必须从 jsonOffsets
中获取所有 TopicPartition
并将其放入 kafkaConsumer.endOffsets
但我不确定该怎么做。
endOffsets
接受类型 TopicPartitions
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
event.progress.sources
// Ignoring sqs / jms sources their offsets
.filter(source => source.description.toLowerCase().contains("kafka"))
// ex offset :
// "endOffset" : {
// "rcs-enriched-event" : {
// "8" : 31376,
// "11" : 39114,
// } ...
.foreach(source => {
/// Map[Topic,Map[Partition, CurrentOffset]]
val jsonOffsets = objectMapper.readValue(source.endOffset, classOf[Map[String, Map[String, Int]]])
jsonOffsets.keys.filter(key => topics.contains(key))
.foreach(topic => {
val topicPartitionMap = new java.util.HashMap[TopicPartition, OffsetAndMetadata]()
// Map[Partition, CurrentOffset]
val topicOffsetList = new ListBuffer[Int]()
val offsets: Option[Map[String, Int]] = jsonOffsets.get(topic)
<!-- move it here -->
offsets match {
case Some(topicOffsetData) =>
topicOffsetData.keys.foreach(partition => {
/// "4" : 34937
val tp = new TopicPartition(topic, partition.toInt)
val oam = new OffsetAndMetadata(topicOffsetData(partition).toLong)
val bbCurrentOffset = topicOffsetData(partition).toLong
<!-- move code below -->
val kafkaPartitionOffset = kafkaConsumer.endOffsets(java.util.Arrays.asList(tp))
// latest offset
val partitionLatestOffset = kafkaPartitionOffset.get(tp)
// Log for a particular partition
val delta = partitionLatestOffset - bbCurrentOffset
topicOffsetList += delta.abs
topicPartitionMap.put(tp, oam)
})
}
try {
kafkaConsumer.commitSync(topicPartitionMap)
} catch {
case e: Exception => log.error(s"${groupId} Could not commit offset", e)
}
//log.info have the group id (unique id), the topic, cumulative consumer lag delta
//push out to logger
log.info("groupId: " + groupId + " topic: " + topic + " lagDeltaSum: " + topicOffsetList.sum)
})
})
}
在我看来你应该循环 offsets
两次。
.foreach(topic => {
val offsets: Option[Map[String, Int]] = jsonOffsets.get(topic)
// Fetch latest offsets
val consumedPartitions = new ListBuffer[TopicPartition]()
offsets match {
case Some(topicOffsetData) =>
topicOffsetData.keys.foreach(partition => {
val tp = new TopicPartition(topic, partition.toInt)
consumedPartitions += tp
}
}
val latestOffsets = kafkaConsumer.endOffsets(consumedPartitions.asJava)
offsets match {
case Some(topicOffsetData) =>
// Use latest offsets, as needed ...