在 Kafka 主题中使用了所有可用消息后,您如何 return 包含消息列表的未来?
How do you return a future containing a list of messages after all available messages have been consumed from a Kafka topic?
我可能错过了 Kafka 消费者的观点,但我想做的是:
消费者订阅一个主题,获取该主题中的所有消息,然后return创建一个包含所有这些消息列表的 Future
我为实现此目的而编写的代码是
val sink = Sink.fold[List[KafkaMessage], KafkaMessage](List[KafkaMessage]()) { (list, kafkaMessage) =>
list :+ kafkaMessage
}
def consume(topic: String) =
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.map { message =>
logger.info(s"Consuming ${message.record.value}")
KafkaMessage(Some(message.record.key()), Some(message.record.value()))
}
.buffer(bufferSize, overflowStrategy)
.runWith(sink)
虽然 Future 永远不会 returns,但它会消耗必要的消息,然后继续重复轮询主题。有没有办法 return Future 然后关闭消费者?
由于 Kafka 用于流式数据,因此没有“所有消息”这样的东西,因为新数据可以随时附加到主题。
我想,您可以做两件事:
- 检查最后 return 编辑了多少记录
poll
并终止或
- 您需要通过
endOffsets
获取“日志的当前结尾”,并将其与每个分区的最新记录的偏移量进行比较。如果两者匹配,那么你可以 return.
第一种方法更简单,但可能有缺点,即不如第二种方法可靠。理论上,民意调查可以 return 零记录,即使有可用记录(即使发生这种情况的可能性不是很高)。
不知道如何在 Scala 中表达这个终止条件(因为我对 Scala 不是很熟悉)。
我可能错过了 Kafka 消费者的观点,但我想做的是:
消费者订阅一个主题,获取该主题中的所有消息,然后return创建一个包含所有这些消息列表的 Future
我为实现此目的而编写的代码是
val sink = Sink.fold[List[KafkaMessage], KafkaMessage](List[KafkaMessage]()) { (list, kafkaMessage) =>
list :+ kafkaMessage
}
def consume(topic: String) =
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.map { message =>
logger.info(s"Consuming ${message.record.value}")
KafkaMessage(Some(message.record.key()), Some(message.record.value()))
}
.buffer(bufferSize, overflowStrategy)
.runWith(sink)
虽然 Future 永远不会 returns,但它会消耗必要的消息,然后继续重复轮询主题。有没有办法 return Future 然后关闭消费者?
由于 Kafka 用于流式数据,因此没有“所有消息”这样的东西,因为新数据可以随时附加到主题。
我想,您可以做两件事:
- 检查最后 return 编辑了多少记录
poll
并终止或 - 您需要通过
endOffsets
获取“日志的当前结尾”,并将其与每个分区的最新记录的偏移量进行比较。如果两者匹配,那么你可以 return.
第一种方法更简单,但可能有缺点,即不如第二种方法可靠。理论上,民意调查可以 return 零记录,即使有可用记录(即使发生这种情况的可能性不是很高)。
不知道如何在 Scala 中表达这个终止条件(因为我对 Scala 不是很熟悉)。