Apache Kafka 是否提供异步订阅回调 API?

Does Apache Kafka provide an asynchronous subscription callback API?

我的项目正在将 Apache Kafka 视为老化的基于 JMS 的消息传递方法的潜在替代品。为了让这个过渡尽可能的顺利,如果替换队列系统(Kafka)有一个异步订阅机制就好了,类似于我们现在项目的JMS机制,使用MessageListener and MessageConsumer订阅主题,异步接收通知。我不太关心 Kafka 是否严格遵守 JMS API,但相反,如果我不这样做,我宁愿不重新设计我们的整个发布-订阅-通知套件 类不需要。

我可以找到各种 KafkaConsumer polling examples,但到目前为止还没有找到任何通过异步通知向客户端通知新消息的示例。

有谁知道当前版本的 Kafka(截至本文 post 时为 0.10.2)是否提供了这样的 API,或者我是否一直在尝试重写遗留代码使用轮询?

Kafka 客户端仅提供按需池机制,但您可以使用 spring-kafka. It provides MessageListener interface and KafkaListener annotation and similar. See documentation

您可能想试用 Confluent Kafka JMS 客户端。

http://docs.confluent.io/3.2.0/clients/kafka-jms-client/docs/index.html

Kafka JMS Client 是 Kafka Producer/Consumer API 之上的 JMS API 包装器,因此它具有所有标准的 JMS 1.1 接口。这是一项企业(订阅)功能,但如果您下载 Confluent Enterprise,您可以免费试用 30 天。

https://www.confluent.io/download/

如果您在使用完所有可用消息后可以接受一点延迟,则可以使用计时器并调用 consumer.poll(0),returns 会立即使用可用消息。消费完消息后,您再次设置计时器,延迟时间相同,比如 100 毫秒。

当吞吐量较低时,批次会较小,这种延迟会更频繁地干预。然而,由于场景是异步的,延迟也不是很关键。无论如何,您永远不知道消息何时到达。

当吞吐量很高时,批次会很大。对于新的 Kafka 消费者,fetch.max.bytes 的默认值为 52428800。与处理大量消息所花费的时间相比,额外的延迟会相对较小。

您可以将它包装在一个小组件中,您可以将对应于当前 MBean 处理程序的函数赋予该组件。