Kafka Scala:如何从 OffsetAndMetadata 获取偏移值 class

Kafka Scala: How to get offset value from OffsetAndMetadata class

我在 offsets 上循环三次:

  1. 获取TopicPartition&offset
  2. 获取TopicPartition & OffsetAndMetadata
  3. 获取生产者与消费者之间的增量

我想知道是否可以从 OffsetAndMetadata 中获取 offset 值,但我不确定如何获取。我无法在网上找到示例来获取此值。感谢任何帮助,谢谢!

override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
    event.progress.sources
      // Ignoring sqs / jms sources their offsets
      .filter(source => source.description.toLowerCase().contains("kafka"))
      // ex offset :
      //    "endOffset" : {
      //      "rcs-enriched-event" : {
      //        "8" : 31376,
      //        "11" : 39114,
      //        "2" : 39376,
      //      } ...
      .foreach(source => {

        /// Map[Topic,Map[Partition, CurrentOffset]]
        val jsonOffsets = objectMapper.readValue(source.endOffset, classOf[Map[String, Map[String, Int]]])
        jsonOffsets.keys.filter(key => topics.contains(key))
          .foreach(topic => {
            val offsets: Map[String, Int] = jsonOffsets(topic)
            val consumedPartitions = new ListBuffer[TopicPartition]()
            val topicOffsetList = new ListBuffer[Int]()

            val mapTopicPartitionOffset = offsets
              .keys
              .map(partition => {
                val tp = new TopicPartition(topic, partition.toInt)
                val offset = offsets(partition).toLong
                (tp -> offset)
              })
              .toMap

            val mapTopicPartition = offsets
              .keys
              .map(partition => {
                val tp = new TopicPartition(topic, partition.toInt)
                val oam = new OffsetAndMetadata(offsets(partition).toLong)
                (tp -> oam)
              })
              .toMap


            for(topicPartition <- mapTopicPartitionOffset){
              consumedPartitions += topicPartition
            }

            try {
              val kafkaPartitionOffset = kafkaConsumer.endOffsets(consumedPartitions.asJava)

              for((topicPartition,offset) <- mapTopicPartitionOffset){

                val bbCurrentOffset =  offset

                // latest offset
                val partitionLatestOffset = kafkaPartitionOffset.get(topicPartition)

                // Partition offset delta
                val delta = partitionLatestOffset - bbCurrentOffset

                topicOffsetList += delta.abs
              }
            } catch {
              case e: Exception => {
                log.error(s"${consumerGroupId} Could not get Kafka offset", e)
              }
            }

            try {
              kafkaConsumer.commitSync(mapTopicPartition.asJava)
            } catch {
              case e: Exception => log.error(s"${consumerGroupId} Could not commit offset", e)
            }

            //log.info have the group id (unique id), the topic, cumulative consumer lag delta
            log.info("consumerGroupId: " + consumerGroupId + " topic: " + topic + " lagDeltaSum: " + topicOffsetList.sum)
          })

      })
  }

如果没有主题和分区,您将无法获得偏移值。您已经针对每个主题进行迭代,并且您的 JSON 表明您在分区映射中有键,所以这个问题只是“如何从映射中获取值?”

更重要的是,当你在其他两个地方有相同的值时,你真的需要OffsetAndMetadata获得偏移量吗?

更具体地说,您不需要 mapTopicPartitionOffsetmapTopicPartition。每个都包含完全相同的信息,一个只有一个对象作为具有空元数据 属性.

的值

您还有 offsets 映射,其值是您正在迭代的当前主题的所有偏移量,键是它的(消耗的)分区。

每个都包含您想要的偏移值

你只需要 mapTopicPartition 就可以与 commitSync 一起使用,所以试试这个

.foreach(topic => {
    val offsets: Map[String, Int] = jsonOffsets(topic)
    // for getting end offsets
    val consumedPartitions = new ListBuffer[TopicPartition]()
    // convert to values accepted by Kafka Consumer API
    val toCommit = offsets
          .keys
          .map(partition => {
            val tp = new TopicPartition(topic, partition.toInt)
            consumedPartitions += tp
            val oam = new OffsetAndMetadata(offsets.get(partition).toLong)
            (tp -> oam)
         })
        .toMap

    // could also use toCommit.keys instead of separate list 
    val kafkaPartitionOffset = kafkaConsumer.endOffsets(consumedPartitions.asJava)
 
    for((topicPartition,oam) <- toCommit) {

        val bbCurrentOffset = oam.offset() 
        // or offsets.get(String.valueOf(topicPartition.partition))