如何在 Kafka Streams 中处理 Not authorized to access topic ...

How to handle Not authorized to access topic ... in Kafka Streams

情况如下

我们在 Kafka Broker 中设置了 SSL + ACL。

我们正在设置流,它读取来自两个主题的消息:

KStream<String, String> stringInput 
    = kBuilder.stream( STRING_SERDE, STRING_SERDE, inTopicName );

stringInput
    .filter( streamFilter::passOrFilterMessages )
    .map( processor )
    .to( outTopicName );

它完成了两次(在循环中)。 然后我们设置通用错误处理程序:

streams.setUncaughtExceptionHandler( ( Thread t, Throwable e ) -> {
                    synchronized ( this ) {
                        LOG.fatal( ... );
                        this.stop();
                    }
                }
        );

问题如下。例如,如果在一个主题证书中不再有效。流正在抛出异常未授权访问主题... 到目前为止,一切都很好。

但是异常是由通用错误处理程序处理的,所以即使第二个主题没有问题,整个应用程序也会停止。

问题是,如何处理每个主题的异常? 如何避免one single topic 授权有问题导致某个时刻整个应用停止的情况?

我了解如果 Broker 不可用,则完整的应用程序可能会停止。但如果只有一个主题不可用,则单流停止,不完成应用,或者?

根据设计,Kafka Streams 将拓扑视为一体,无法区分这两部分。对于您的特定情况,当您循环并构建到独立管道时,您可以 运行 两个并行的 KafkaStreams 实例(在同一个 application/JVM 中)以将两者相互隔离。因此,如果一个失败,另一个不会受到影响。您需要为两个实例使用两个不同的 application.id