Kafka Scala:如何从 OffsetAndMetadata 获取偏移值 class

Kafka Scala: How to get offset value from OffsetAndMetadata class

我在 offsets 上循环三次:

  1. 获取TopicPartition&offset
  2. 获取TopicPartition & OffsetAndMetadata
  3. 获取生产者与消费者之间的增量

我想知道是否可以从 OffsetAndMetadata 中获取 offset 值,但我不确定如何获取。我无法在网上找到示例来获取此值。感谢任何帮助,谢谢!

override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
      // Ignoring sqs / jms sources their offsets
      .filter(source => source.description.toLowerCase().contains("kafka"))
      // ex offset :
      //    "endOffset" : {
      //      "rcs-enriched-event" : {
      //        "8" : 31376,
      //        "11" : 39114,
      //        "2" : 39376,
      //      } ...
      .foreach(source => {

        /// Map[Topic,Map[Partition, CurrentOffset]]
        val jsonOffsets = objectMapper.readValue(source.endOffset, classOf[Map[String, Map[String, Int]]])
        jsonOffsets.keys.filter(key => topics.contains(key))
          .foreach(topic => {
            val offsets: Map[String, Int] = jsonOffsets(topic)
            val consumedPartitions = new ListBuffer[TopicPartition]()
            val topicOffsetList = new ListBuffer[Int]()

            val mapTopicPartitionOffset = offsets
              .map(partition => {
                val tp = new TopicPartition(topic, partition.toInt)
                val offset = offsets(partition).toLong
                (tp -> offset)

            val mapTopicPartition = offsets
              .map(partition => {
                val tp = new TopicPartition(topic, partition.toInt)
                val oam = new OffsetAndMetadata(offsets(partition).toLong)
                (tp -> oam)

            for(topicPartition <- mapTopicPartitionOffset){
              consumedPartitions += topicPartition

            try {
              val kafkaPartitionOffset = kafkaConsumer.endOffsets(consumedPartitions.asJava)

              for((topicPartition,offset) <- mapTopicPartitionOffset){

                val bbCurrentOffset =  offset

                // latest offset
                val partitionLatestOffset = kafkaPartitionOffset.get(topicPartition)

                // Partition offset delta
                val delta = partitionLatestOffset - bbCurrentOffset

                topicOffsetList += delta.abs
            } catch {
              case e: Exception => {
                log.error(s"${consumerGroupId} Could not get Kafka offset", e)

            try {
            } catch {
              case e: Exception => log.error(s"${consumerGroupId} Could not commit offset", e)

            //log.info have the group id (unique id), the topic, cumulative consumer lag delta
            log.info("consumerGroupId: " + consumerGroupId + " topic: " + topic + " lagDeltaSum: " + topicOffsetList.sum)


如果没有主题和分区,您将无法获得偏移值。您已经针对每个主题进行迭代,并且您的 JSON 表明您在分区映射中有键,所以这个问题只是“如何从映射中获取值?”


更具体地说,您不需要 mapTopicPartitionOffsetmapTopicPartition。每个都包含完全相同的信息,一个只有一个对象作为具有空元数据 属性.


您还有 offsets 映射,其值是您正在迭代的当前主题的所有偏移量,键是它的(消耗的)分区。


你只需要 mapTopicPartition 就可以与 commitSync 一起使用,所以试试这个

.foreach(topic => {
    val offsets: Map[String, Int] = jsonOffsets(topic)
    // for getting end offsets
    val consumedPartitions = new ListBuffer[TopicPartition]()
    // convert to values accepted by Kafka Consumer API
    val toCommit = offsets
          .map(partition => {
            val tp = new TopicPartition(topic, partition.toInt)
            consumedPartitions += tp
            val oam = new OffsetAndMetadata(offsets.get(partition).toLong)
            (tp -> oam)

    // could also use toCommit.keys instead of separate list 
    val kafkaPartitionOffset = kafkaConsumer.endOffsets(consumedPartitions.asJava)
    for((topicPartition,oam) <- toCommit) {

        val bbCurrentOffset = oam.offset() 
        // or offsets.get(String.valueOf(topicPartition.partition))