Scala Kafka Spark:获取所有kafkaConsumer endOffSets并将其分配给一个val

Scala Kafka Spark: Get all kafkaConsumer endOffSets and assign it to a val

下面的代码效率低下,它每次都在 for 循环中请求 kafkaConsumer(它表示 <!-- move code below -->)。如何将其移动到 <!-- move it here -->,以便每个主题只需要请求一次?我相信我必须从 jsonOffsets 中获取所有 TopicPartition 并将其放入 kafkaConsumer.endOffsets 但我不确定该怎么做。

endOffsets 接受类型 TopicPartitions

override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
    event.progress.sources
      // Ignoring sqs / jms sources their offsets
      .filter(source => source.description.toLowerCase().contains("kafka"))
      // ex offset :
      //    "endOffset" : {
      //      "rcs-enriched-event" : {
      //        "8" : 31376,
      //        "11" : 39114,
      //      } ...
      .foreach(source => {

        /// Map[Topic,Map[Partition, CurrentOffset]]
        val jsonOffsets = objectMapper.readValue(source.endOffset, classOf[Map[String, Map[String, Int]]])
        jsonOffsets.keys.filter(key => topics.contains(key))
          .foreach(topic => {
            val topicPartitionMap = new java.util.HashMap[TopicPartition, OffsetAndMetadata]()

            // Map[Partition, CurrentOffset]
            val topicOffsetList = new ListBuffer[Int]()

            val offsets: Option[Map[String, Int]] = jsonOffsets.get(topic)

            
 <!-- move it here -->

            offsets match {
              case Some(topicOffsetData) =>
                topicOffsetData.keys.foreach(partition => {
                  /// "4" : 34937
                  val tp = new TopicPartition(topic, partition.toInt)
                  val oam = new OffsetAndMetadata(topicOffsetData(partition).toLong)

                  val bbCurrentOffset = topicOffsetData(partition).toLong

                  <!-- move code below -->
                  val kafkaPartitionOffset = kafkaConsumer.endOffsets(java.util.Arrays.asList(tp)) 

                  // latest offset
                  val partitionLatestOffset = kafkaPartitionOffset.get(tp)

                  // Log for a particular partition
                  val delta = partitionLatestOffset - bbCurrentOffset

                  topicOffsetList += delta.abs

                  topicPartitionMap.put(tp, oam)
                })
            }
            try {
              kafkaConsumer.commitSync(topicPartitionMap)
            } catch {
              case e: Exception => log.error(s"${groupId} Could not commit offset", e)
            }

            //log.info have the group id (unique id), the topic, cumulative consumer lag delta
            //push out to logger
            log.info("groupId: " + groupId + " topic: " + topic + " lagDeltaSum: " + topicOffsetList.sum)
          })

      })
  }

在我看来你应该循环 offsets 两次。

.foreach(topic => {
    val offsets: Option[Map[String, Int]] = jsonOffsets.get(topic)
    // Fetch latest offsets
    val consumedPartitions = new ListBuffer[TopicPartition]() 
    offsets match {
      case Some(topicOffsetData) =>
        topicOffsetData.keys.foreach(partition => {
          val tp = new TopicPartition(topic, partition.toInt)
          consumedPartitions += tp
        }
    }
    val latestOffsets = kafkaConsumer.endOffsets(consumedPartitions.asJava) 
    
    offsets match {
      case Some(topicOffsetData) =>
         // Use latest offsets, as needed ...