Kafka Streams:由于恢复期间更改日志状态发生变化而无法重新平衡
Kafka Streams: Failed to rebalance due to Change log state changing during restoring
需要一些帮助来找出我在其中一个 Kafka 流消费者中收到的异常。
我已经使用低级处理器 API 实现了 Kafka 流。对于我们从 Kafka 收到的每个更新,它被合并并更新到密钥库,以便维护状态。最初我们 运行 只有一个消费者,一段时间后我们尝试提出第二个消费者。但是第二个消费者在重新平衡期间抛出了一个异常,说明它没有重新平衡。发生这种情况是因为更改日志的状态已更改(下面的异常共享)。我假设,当重新平衡发生时,第一个消费者收到了一些更新,因此更新被推送到相应的更改日志。请帮忙。还共享相同的示例代码。
我正在使用 Kafka 2_11 0.10.2.1 并且主题有 72 个分区
异常
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-1] Failed to rebalance
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:598)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
Caused by: java.lang.IllegalStateException: task [0_60] Log end offset of Kafka-xxxxxxxxxxxxxxxx-InfoStore-changelog-60 should not change while restoring: old end offset 80638, current offset 80640
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:252)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:40)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.init(ChangeLoggingKeyValueStore.java:56)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.run(MeteredKeyValueStore.java:100)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
at org.apache.kafka.streams.processor.internals.StreamThread.access0(StreamThread.java:69)
at org.apache.kafka.streams.processor.internals.StreamThread.onPartitionsAssigned(StreamThread.java:234)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
代码段
public class InfoProcessor extends AbstractProcessor<Key, Update> {
private static Logger logger = Logger.getLogger(InfoProcessor.class);
private ProcessorContext context;
private KeyValueStore<Key, Info> infoStore;
private int visitProcessorInstanceId;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
this.context.schedule(Constants.BATCH_DURATION_SECONDS * 1000);
infoStore = (KeyValueStore<Key, Info>) context.getStateStore("InfoStore");
}
@Override
public void process(Key key, Update update) {
try {
if (key != null && update != null) {
Info info = infoStore.get(key);
// merge logic
infoStore.put(key, info);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
}
context.commit();
}
@Override
public void punctuate(long timestamp) {
try {
KeyValueIterator<Key, Info> iter = this.infoStore.all();
while (iter.hasNext()) {
// processing logic
}
iter.close();
context.commit();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
谢谢。
您的观察和推理是正确的。如果由于状态迁移导致重新平衡需要很长时间并且发生另一个重新平衡,则可能会发生这种情况:
- 一审是运行
- 第二个实例启动,触发重新平衡
- 二审重现状态
- 又一次重新平衡发生了(不确定在你的情况下是如何触发的)
- 第二个实例仍在重新创建状态并且没有重新加入组(因此它退出了组)
- 第一个实例将状态迁移回来(它仍然有一个完整的状态副本,所以没有什么可以重新创建——第二个实例还没有开始处理任何东西)并继续写入更新日志主题
- 二审异常死亡,如描述
你能证实一下吗?如果是,只要状态娱乐是 运行.
,就需要避免第二次再平衡
顺便说一句:此行为已在 trunk
中得到改进,并将在即将发布的 0.11.0.1
版本中得到修复。您可以将 Kafka Streams 应用程序更新为 0.11.0.1
,而无需升级代理。 0.11.0.1
应该会在接下来的几周内发布。
需要一些帮助来找出我在其中一个 Kafka 流消费者中收到的异常。
我已经使用低级处理器 API 实现了 Kafka 流。对于我们从 Kafka 收到的每个更新,它被合并并更新到密钥库,以便维护状态。最初我们 运行 只有一个消费者,一段时间后我们尝试提出第二个消费者。但是第二个消费者在重新平衡期间抛出了一个异常,说明它没有重新平衡。发生这种情况是因为更改日志的状态已更改(下面的异常共享)。我假设,当重新平衡发生时,第一个消费者收到了一些更新,因此更新被推送到相应的更改日志。请帮忙。还共享相同的示例代码。 我正在使用 Kafka 2_11 0.10.2.1 并且主题有 72 个分区
异常
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-1] Failed to rebalance
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:598)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
Caused by: java.lang.IllegalStateException: task [0_60] Log end offset of Kafka-xxxxxxxxxxxxxxxx-InfoStore-changelog-60 should not change while restoring: old end offset 80638, current offset 80640
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:252)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:40)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.init(ChangeLoggingKeyValueStore.java:56)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.run(MeteredKeyValueStore.java:100)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
at org.apache.kafka.streams.processor.internals.StreamThread.access0(StreamThread.java:69)
at org.apache.kafka.streams.processor.internals.StreamThread.onPartitionsAssigned(StreamThread.java:234)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
代码段
public class InfoProcessor extends AbstractProcessor<Key, Update> {
private static Logger logger = Logger.getLogger(InfoProcessor.class);
private ProcessorContext context;
private KeyValueStore<Key, Info> infoStore;
private int visitProcessorInstanceId;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
this.context.schedule(Constants.BATCH_DURATION_SECONDS * 1000);
infoStore = (KeyValueStore<Key, Info>) context.getStateStore("InfoStore");
}
@Override
public void process(Key key, Update update) {
try {
if (key != null && update != null) {
Info info = infoStore.get(key);
// merge logic
infoStore.put(key, info);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
}
context.commit();
}
@Override
public void punctuate(long timestamp) {
try {
KeyValueIterator<Key, Info> iter = this.infoStore.all();
while (iter.hasNext()) {
// processing logic
}
iter.close();
context.commit();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
谢谢。
您的观察和推理是正确的。如果由于状态迁移导致重新平衡需要很长时间并且发生另一个重新平衡,则可能会发生这种情况:
- 一审是运行
- 第二个实例启动,触发重新平衡
- 二审重现状态
- 又一次重新平衡发生了(不确定在你的情况下是如何触发的)
- 第二个实例仍在重新创建状态并且没有重新加入组(因此它退出了组)
- 第一个实例将状态迁移回来(它仍然有一个完整的状态副本,所以没有什么可以重新创建——第二个实例还没有开始处理任何东西)并继续写入更新日志主题
- 二审异常死亡,如描述
你能证实一下吗?如果是,只要状态娱乐是 运行.
,就需要避免第二次再平衡顺便说一句:此行为已在 trunk
中得到改进,并将在即将发布的 0.11.0.1
版本中得到修复。您可以将 Kafka Streams 应用程序更新为 0.11.0.1
,而无需升级代理。 0.11.0.1
应该会在接下来的几周内发布。