在 Kafka 主题中使用了所有可用消息后,您如何 return 包含消息列表的未来?

How do you return a future containing a list of messages after all available messages have been consumed from a Kafka topic?

我可能错过了 Kafka 消费者的观点,但我想做的是:

消费者订阅一个主题,获取该主题中的所有消息,然后return创建一个包含所有这些消息列表的 Future

我为实现此目的而编写的代码是

val sink = Sink.fold[List[KafkaMessage], KafkaMessage](List[KafkaMessage]()) { (list, kafkaMessage) =>
list :+ kafkaMessage
}

def consume(topic: String) =
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
  .map { message =>
    logger.info(s"Consuming ${message.record.value}")
    KafkaMessage(Some(message.record.key()), Some(message.record.value()))
  }
  .buffer(bufferSize, overflowStrategy)
  .runWith(sink)

虽然 Future 永远不会 returns,但它会消耗必要的消息,然后继续重复轮询主题。有没有办法 return Future 然后关闭消费者?

由于 Kafka 用于流式数据,因此没有“所有消息”这样的东西,因为新数据可以随时附加到主题。

我想,您可以做两件事:

  1. 检查最后 return 编辑了多少记录 poll 并终止或
  2. 您需要通过 endOffsets 获取“日志的当前结尾”,并将其与每个分区的最新记录的偏移量进行比较。如果两者匹配,那么你可以 return.

第一种方法更简单,但可能有缺点,即不如第二种方法可靠。理论上,民意调查可以 return 零记录,即使有可用记录(即使发生这种情况的可能性不是很高)。

不知道如何在 Scala 中表达这个终止条件(因为我对 Scala 不是很熟悉)。