Failed to rebalance in Kafka Streams with more than one topic partition 错误

Failed to rebalance error in Kafka Streams with more than one topic partition

当源主题分区计数 = 1 时工作正常。如果我将分区增加到大于 1 的任何值,我会看到以下错误。适用于 Low level 和 DSL API。任何指针?可能缺少什么?

org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-1] Failed to rebalance
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:410)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_1] Store in-memory-avg-store's change log (cpu-streamz-in-memory-avg-store-changelog) does not contain partition 1
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:185)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)
        at org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier$MemoryStore.init(InMemoryKeyValueStoreSupplier.java:102)
        at org.apache.kafka.streams.state.internals.InMemoryKeyValueLoggedStore.init(InMemoryKeyValueLoggedStore.java:56)
        at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
        at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
        at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:119)
        at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633)
        at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660)
        at org.apache.kafka.streams.processor.internals.StreamThread.access0(StreamThread.java:69)
        at org.apache.kafka.streams.processor.internals.StreamThread.onPartitionsAssigned(StreamThread.java:124)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)

这是一个操作问题。 Kafka Streams 不允许在 "life time".

期间更改输入主题分区的数量

如果您停止 运行 Kafka Streams 应用程序,更改输入主题分区的数量,然后重新启动您的应用程序,它将中断(出现上面看到的错误)。对于生产用例修复此问题很棘手,强烈建议不要更改输入主题分区的数量(参见下面的评论)。对于POC/demos,修复起来并不难。

为了解决这个问题,您应该使用 Kafka 的应用程序重置工具来重置您的应用程序:

使用应用程序重置工具的缺点是会清除整个应用程序状态。因此,为了让您的应用程序进入与以前相同的状态,您需要从头开始重新处理整个输入主题。这当然只有在所有输入数据仍然可用并且应用主题保留 time/size 策略的经纪人没有删除任何内容的情况下才有可能。

此外,您应该注意,向输入主题添加分区会更改主题的分区架构(默认 hash-based 按键分区)。因为 Kafka Streams 假定输入主题按键正确分区,如果您使用重置工具并重新处理所有数据,您可能会得到错误的结果,因为 "old" 数据与 "new" 数据(即数据添加新分区后编写)。对于生产用例,您需要从原始主题中读取所有数据并将其写入新主题(分区数量增加)以使您的数据正确分区(或者当然,此步骤可能会更改具有不同记录的顺序键——通常不应该是一个问题——只是想提一下)。之后您可以将新主题用作您的 Streams 应用程序的输入主题。

这个重新分区步骤也可以在您的 Streams 应用程序中轻松完成,方法是在阅读原始主题之后和进行任何实际处理之前直接使用运算符 through("new_topic_with_more_partitions")

但是,一般来说,建议对生产用例的主题进行过度分区,这样您以后就永远不需要更改分区数。过度分区的开销相当小,以后可以省去很多麻烦。如果您使用 Kafka,这是一个普遍的建议——它不限于 Streams 用例。

One more remark:

Some people might suggest to increase the number of partitions of Kafka Streams internal topics manually. First, this would be a hack and is not recommended for certain reasons.

  1. It might be tricky to figure out what the right number is, as it depends on various factors (as it's a Stream's internal implementation detail).
  2. You also face the problem of breaking the partitioning scheme, as described in the paragraph above. Thus, you application most likely ends up in an inconsistent state.

In order to avoid inconsistent application state, Streams does not delete any internal topics or changes the number of partitions of internal topics automatically, but fails with the error message you reported. This ensure, that the user is aware of all implications by doing the "cleanup" manually.

顺便说一句:对于即将推出的 Kafka 0.10.2,此错误消息已得到改进:https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L100-L103