Kafka Consumer 读取到它开始的时间,然后永远挂起

Kafka Consumer reads up to when it started, then hangs forever

我已经设置了一个包含 1 个生产者和 1 个消费者的 Kafka 解决方案,并验证了所有连接是否正确(我可以生成消息并使用它们)。 ZK Server & Kakfa Server 已启动并稳定

如前所述,我的问题是消费者可以很好地从它停止的地方读取消息,但只会读取在它开始读取之前创建的消息。之后,新消息将不会被读取,直到我杀死消费者并重新启动他。

相关消费者 Scala 代码

  val consumer = Consumer.create(new ConsumerConfig(readConsumerPropertiesFromConfig))
  val filterSpec = new Whitelist("some-valid-topic")

  val stream: KafkaStream[String, String] =
    consumer.createMessageStreamsByFilter(filterSpec, 1, new StringDecoder, new StringDecoder).head

  log.info(s"Consumer started. Listening to topics [$filterSpec].")

  def read() = stream map digest

digest 使用 MessageAndMetadata 并从中获得乐趣

def digest(messageAndMeta: MessageAndMetadata[String, String]) = {
    log.info(s"processing the message [$messageAndMeta]")

属性是

properties.put("group.id", "default_consumer_group")
properties.put("zookeeper.connect", "localhost:2181")
properties.put("auto.offset.reset", "smallest")
properties.put("consumer.timeout.ms", 2000)

我可以用它重现的时间表

有什么想法吗?谢谢

我怀疑您的问题是调用 map 以便从流中使用。

尝试通过 stream.iterator.hasNext 和 stream.iterator.next 直接使用迭代器,看看是否有所不同。这里有一个例子: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

此外,如果 2 秒内没有可用数据,您将期望收到 ConsumerTimeoutException,因此请确保您的代码已准备好处理该异常。

问题是我忽略了一个让我的 Consumer 崩溃的 ConsumerTimeoutException,我把它误认为是 "the Consumer hanging forever"。

来自消费者配置文档:

By default, this value is -1 and a consumer blocks indefinitely if no new message is available for consumption.

我将此设置为几秒钟,之后它会抛出。通过将它设置为 -1,我得到了想要的行为,尽管理想的解决方案(对于我的用例)是按照这个项目的方式实现一些东西:https://github.com/kciesielski/reactive-kafka

This thread pointed me in the right direction

希望对其他人有所帮助。