具有错误处理和状态存储回滚的处理器拓扑

Processor topology with error handling and state store rollback

我已经给出了从主题、处理器和接收器到其他主题的源拓扑

StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
              Stores.persistentKeyValueStore("store"),
              Serdes.String(),
              Serdes.String());
Topology topology = new Topology();
topology.addSource("incoming", Serdes.String().deserializer(), Serdes.String().deserializer(), "topic");
topology.addProcessor("incoming_first", () -> new MyProcessor(), "incoming");
topology.addStateStore(storeBuilder, "incoming_first");
topology.addSink("sink", "sink", "incoming_first"),
public class MyProcessor implements Processor<String, String> {

    private ProcessorContext context;
    private KeyValueStore<String, String> stateStore;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        this.stateStore = (KeyValueStore<String, String>) context.getStateStore("store");
    }

    @Override
    public void process(String key, String value) {
        stateStore.put(key, value);
        ....
        throw new RuntimeException();
        ....
        context.forward(); //forward to sink
    }

    @Override
    public void close() {
    }
}

我的问题是当写入state store后处理器出现异常时如何处理。 Kafka 是否有一些错误处理机制与状态存储回滚来重新处理消息或将其转发到错误主题?

目前,在没有任何处理的情况下,我的应用程序完全死机了,我需要重新启动它。 此外,如果我添加一些 try-catch,则标识为 ok 的消息和我的状态存储已更新,并且消息被发送到 changelog 主题。

状态存储是否需要一些回滚机制?

https://issues.apache.org/jira/browse/KAFKA-7192 KIP 说,如果发生异常,不应使用 EOS 处理状态存储,但这仅适用于我的整个应用程序死亡的情况。

提前致谢!

对于从 Processor 抛出的任何异常,相应的线程将始终终止。防止这种情况的唯一方法是捕获所有异常并相应地处理它们(无论哪种正确的处理方式适合您的应用程序)。

如果一个线程死了,你重启你的应用程序来恢复线程,这取决于你的配置是否回滚存储。默认情况下,不会回滚存储。仅当您通过设置配置参数 processing.guarantees="exactly_once" 启用 exactly-once 语义时,存储才会在重新启动时回滚。

如果您在 Processor 代码中发现任何异常并且您的业务逻辑需要回滚商店,您需要自己实现,首先从商店获取旧值,更新商店,并在出现异常时将旧值放回存储中以 overwrite/undo 您所有的写入。