Kafka 消费者在 iterator.hasNext() 卡住了(实际上是暂停和恢复),即使主题中有大量消息要消费
Kafka consumer stuck (in fact pauses and resumes) at iterator.hasNext() even though there are plenty of messages to consume in the topic
我正在开发一个分布式解决方案,其中有 2 个消费者 运行 在同一消费者组下的 2 台不同服务器上运行,并从具有 2 个分区和复制因子 3 的 3 机 Kafka 主题中消费。在我的消费者内部class(这是一个Callable
),关键部分如下所示:
@Override
public Object call() throws Exception {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
try {
while (it.hasNext()){
byte[] message = it.next().message();
// other code here
}
} catch (Throwable e) {
e.printStackTrace();
}
log.error("Shutting down Thread: " + streamNumber + ", kafka consumer offline!!!");
}
我的消费者 class 还产生了 16 个其他线程来处理消费的消息。当我在 2 个不同的服务器上启动我的两个消费者时,最初几分钟他们每个人似乎都在无缝地使用来自 Kafka 主题的消息(每个一个分区)。然而,在一定时间后,每个消费者似乎都停留在 while (it.hasNext())
语句,即使每个分区中还有数千条消息要消费。下面的截图显示了当时 Kafka 消费者偏移量的状态。
如您所见,消费者远远落后于主题中可用消息的数量。从我的日志中可以看出,虽然这个消耗线程已暂停,但其他线程 运行 正常并正在执行它们的工作。从更长的 运行,有趣的是,我还注意到消费线程在一段时间后有点暂停和恢复。然而,每次暂停,下一次消费的消息数量也会减少得离谱。例如,在我首先启动两个消费者之后,每个消费者似乎都无缝地消费了大约 15,000 条消息,直到卡在流迭代器处,然后暂停了 20 - 25 分钟并消费了 5,000 多条消息,然后再次暂停了约 30 分钟并消费了再多 100 个,这样继续下去。如果我停止消费者进程并重新启动,整个循环似乎会重复。
这些是我正在使用的消费者配置:
group.id=ct_job_backfill
zookeeper.session.timeout.ms=1000
zookeeper.sync.time.ms=200
auto.commit.enable=true
auto.offset.reset=smallest
rebalance.max.retries=20
rebalance.backoff.ms=2000
topic.name=contentTaskProd
每个消费者服务器都是 32 线程的 64 GB 机器运行正在 Linux。
知道是什么原因造成的吗?提前致谢。如果您需要更多信息或有任何不清楚的地方,请告诉我。
UPDATE: I have tried increasing the number of partitions from 2 to 32, and inside each of my consumer server spawning 16 consumers threads each consuming from a partition. However, that doesn't seem to change the behaviour. I notice the same pause and resume cycle.
我遇到了完全相同的问题。在浏览解决方案时,我遇到了已在 https://issues.apache.org/jira/browse/KAFKA-2978.
上用 kafka 报告的问题
看起来他们已经在 0.9.0.1 版本中解决了。我将尝试使用此版本更新库。如果我能够用新的 jar 解决这个问题,将会更新。同时,您可以尝试相同的方法。
~干杯
我正在开发一个分布式解决方案,其中有 2 个消费者 运行 在同一消费者组下的 2 台不同服务器上运行,并从具有 2 个分区和复制因子 3 的 3 机 Kafka 主题中消费。在我的消费者内部class(这是一个Callable
),关键部分如下所示:
@Override
public Object call() throws Exception {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
try {
while (it.hasNext()){
byte[] message = it.next().message();
// other code here
}
} catch (Throwable e) {
e.printStackTrace();
}
log.error("Shutting down Thread: " + streamNumber + ", kafka consumer offline!!!");
}
我的消费者 class 还产生了 16 个其他线程来处理消费的消息。当我在 2 个不同的服务器上启动我的两个消费者时,最初几分钟他们每个人似乎都在无缝地使用来自 Kafka 主题的消息(每个一个分区)。然而,在一定时间后,每个消费者似乎都停留在 while (it.hasNext())
语句,即使每个分区中还有数千条消息要消费。下面的截图显示了当时 Kafka 消费者偏移量的状态。
如您所见,消费者远远落后于主题中可用消息的数量。从我的日志中可以看出,虽然这个消耗线程已暂停,但其他线程 运行 正常并正在执行它们的工作。从更长的 运行,有趣的是,我还注意到消费线程在一段时间后有点暂停和恢复。然而,每次暂停,下一次消费的消息数量也会减少得离谱。例如,在我首先启动两个消费者之后,每个消费者似乎都无缝地消费了大约 15,000 条消息,直到卡在流迭代器处,然后暂停了 20 - 25 分钟并消费了 5,000 多条消息,然后再次暂停了约 30 分钟并消费了再多 100 个,这样继续下去。如果我停止消费者进程并重新启动,整个循环似乎会重复。
这些是我正在使用的消费者配置:
group.id=ct_job_backfill
zookeeper.session.timeout.ms=1000
zookeeper.sync.time.ms=200
auto.commit.enable=true
auto.offset.reset=smallest
rebalance.max.retries=20
rebalance.backoff.ms=2000
topic.name=contentTaskProd
每个消费者服务器都是 32 线程的 64 GB 机器运行正在 Linux。
知道是什么原因造成的吗?提前致谢。如果您需要更多信息或有任何不清楚的地方,请告诉我。
UPDATE: I have tried increasing the number of partitions from 2 to 32, and inside each of my consumer server spawning 16 consumers threads each consuming from a partition. However, that doesn't seem to change the behaviour. I notice the same pause and resume cycle.
我遇到了完全相同的问题。在浏览解决方案时,我遇到了已在 https://issues.apache.org/jira/browse/KAFKA-2978.
上用 kafka 报告的问题看起来他们已经在 0.9.0.1 版本中解决了。我将尝试使用此版本更新库。如果我能够用新的 jar 解决这个问题,将会更新。同时,您可以尝试相同的方法。 ~干杯