Kafka Consumer 读取到它开始的时间,然后永远挂起
Kafka Consumer reads up to when it started, then hangs forever
我已经设置了一个包含 1 个生产者和 1 个消费者的 Kafka 解决方案,并验证了所有连接是否正确(我可以生成消息并使用它们)。 ZK Server & Kakfa Server 已启动并稳定
如前所述,我的问题是消费者可以很好地从它停止的地方读取消息,但只会读取在它开始读取之前创建的消息。之后,新消息将不会被读取,直到我杀死消费者并重新启动他。
相关消费者 Scala 代码
val consumer = Consumer.create(new ConsumerConfig(readConsumerPropertiesFromConfig))
val filterSpec = new Whitelist("some-valid-topic")
val stream: KafkaStream[String, String] =
consumer.createMessageStreamsByFilter(filterSpec, 1, new StringDecoder, new StringDecoder).head
log.info(s"Consumer started. Listening to topics [$filterSpec].")
def read() = stream map digest
digest 使用 MessageAndMetadata 并从中获得乐趣
def digest(messageAndMeta: MessageAndMetadata[String, String]) = {
log.info(s"processing the message [$messageAndMeta]")
属性是
properties.put("group.id", "default_consumer_group")
properties.put("zookeeper.connect", "localhost:2181")
properties.put("auto.offset.reset", "smallest")
properties.put("consumer.timeout.ms", 2000)
我可以用它重现的时间表
- 产生 5 条消息
- 启动消费者
- 消费者阅读了 5 条消息
- 再生成 15 条消息
- 消费者忽略新消息并永远挂起
- 杀死并重启消费者
- 消费者读取了 15 条消息,再次永远挂起
有什么想法吗?谢谢
我怀疑您的问题是调用 map 以便从流中使用。
尝试通过 stream.iterator.hasNext 和 stream.iterator.next 直接使用迭代器,看看是否有所不同。这里有一个例子:
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
此外,如果 2 秒内没有可用数据,您将期望收到 ConsumerTimeoutException,因此请确保您的代码已准备好处理该异常。
问题是我忽略了一个让我的 Consumer 崩溃的 ConsumerTimeoutException,我把它误认为是 "the Consumer hanging forever"。
来自消费者配置文档:
By default, this value is -1 and a consumer blocks indefinitely if no new message is available for consumption.
我将此设置为几秒钟,之后它会抛出。通过将它设置为 -1,我得到了想要的行为,尽管理想的解决方案(对于我的用例)是按照这个项目的方式实现一些东西:https://github.com/kciesielski/reactive-kafka
This thread pointed me in the right direction
希望对其他人有所帮助。
我已经设置了一个包含 1 个生产者和 1 个消费者的 Kafka 解决方案,并验证了所有连接是否正确(我可以生成消息并使用它们)。 ZK Server & Kakfa Server 已启动并稳定
如前所述,我的问题是消费者可以很好地从它停止的地方读取消息,但只会读取在它开始读取之前创建的消息。之后,新消息将不会被读取,直到我杀死消费者并重新启动他。
相关消费者 Scala 代码
val consumer = Consumer.create(new ConsumerConfig(readConsumerPropertiesFromConfig))
val filterSpec = new Whitelist("some-valid-topic")
val stream: KafkaStream[String, String] =
consumer.createMessageStreamsByFilter(filterSpec, 1, new StringDecoder, new StringDecoder).head
log.info(s"Consumer started. Listening to topics [$filterSpec].")
def read() = stream map digest
digest 使用 MessageAndMetadata 并从中获得乐趣
def digest(messageAndMeta: MessageAndMetadata[String, String]) = {
log.info(s"processing the message [$messageAndMeta]")
属性是
properties.put("group.id", "default_consumer_group")
properties.put("zookeeper.connect", "localhost:2181")
properties.put("auto.offset.reset", "smallest")
properties.put("consumer.timeout.ms", 2000)
我可以用它重现的时间表
- 产生 5 条消息
- 启动消费者
- 消费者阅读了 5 条消息
- 再生成 15 条消息
- 消费者忽略新消息并永远挂起
- 杀死并重启消费者
- 消费者读取了 15 条消息,再次永远挂起
有什么想法吗?谢谢
我怀疑您的问题是调用 map 以便从流中使用。
尝试通过 stream.iterator.hasNext 和 stream.iterator.next 直接使用迭代器,看看是否有所不同。这里有一个例子: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
此外,如果 2 秒内没有可用数据,您将期望收到 ConsumerTimeoutException,因此请确保您的代码已准备好处理该异常。
问题是我忽略了一个让我的 Consumer 崩溃的 ConsumerTimeoutException,我把它误认为是 "the Consumer hanging forever"。
来自消费者配置文档:
By default, this value is -1 and a consumer blocks indefinitely if no new message is available for consumption.
我将此设置为几秒钟,之后它会抛出。通过将它设置为 -1,我得到了想要的行为,尽管理想的解决方案(对于我的用例)是按照这个项目的方式实现一些东西:https://github.com/kciesielski/reactive-kafka
This thread pointed me in the right direction
希望对其他人有所帮助。