如何在 Kafka Streams 中处理 Not authorized to access topic ...
How to handle Not authorized to access topic ... in Kafka Streams
情况如下
我们在 Kafka Broker 中设置了 SSL + ACL。
我们正在设置流,它读取来自两个主题的消息:
KStream<String, String> stringInput
= kBuilder.stream( STRING_SERDE, STRING_SERDE, inTopicName );
stringInput
.filter( streamFilter::passOrFilterMessages )
.map( processor )
.to( outTopicName );
它完成了两次(在循环中)。
然后我们设置通用错误处理程序:
streams.setUncaughtExceptionHandler( ( Thread t, Throwable e ) -> {
synchronized ( this ) {
LOG.fatal( ... );
this.stop();
}
}
);
问题如下。例如,如果在一个主题证书中不再有效。流正在抛出异常未授权访问主题...
到目前为止,一切都很好。
但是异常是由通用错误处理程序处理的,所以即使第二个主题没有问题,整个应用程序也会停止。
问题是,如何处理每个主题的异常?
如何避免one single topic 授权有问题导致某个时刻整个应用停止的情况?
我了解如果 Broker 不可用,则完整的应用程序可能会停止。但如果只有一个主题不可用,则单流停止,不完成应用,或者?
根据设计,Kafka Streams 将拓扑视为一体,无法区分这两部分。对于您的特定情况,当您循环并构建到独立管道时,您可以 运行 两个并行的 KafkaStreams
实例(在同一个 application/JVM 中)以将两者相互隔离。因此,如果一个失败,另一个不会受到影响。您需要为两个实例使用两个不同的 application.id
。
情况如下
我们在 Kafka Broker 中设置了 SSL + ACL。
我们正在设置流,它读取来自两个主题的消息:
KStream<String, String> stringInput
= kBuilder.stream( STRING_SERDE, STRING_SERDE, inTopicName );
stringInput
.filter( streamFilter::passOrFilterMessages )
.map( processor )
.to( outTopicName );
它完成了两次(在循环中)。 然后我们设置通用错误处理程序:
streams.setUncaughtExceptionHandler( ( Thread t, Throwable e ) -> {
synchronized ( this ) {
LOG.fatal( ... );
this.stop();
}
}
);
问题如下。例如,如果在一个主题证书中不再有效。流正在抛出异常未授权访问主题... 到目前为止,一切都很好。
但是异常是由通用错误处理程序处理的,所以即使第二个主题没有问题,整个应用程序也会停止。
问题是,如何处理每个主题的异常? 如何避免one single topic 授权有问题导致某个时刻整个应用停止的情况?
我了解如果 Broker 不可用,则完整的应用程序可能会停止。但如果只有一个主题不可用,则单流停止,不完成应用,或者?
根据设计,Kafka Streams 将拓扑视为一体,无法区分这两部分。对于您的特定情况,当您循环并构建到独立管道时,您可以 运行 两个并行的 KafkaStreams
实例(在同一个 application/JVM 中)以将两者相互隔离。因此,如果一个失败,另一个不会受到影响。您需要为两个实例使用两个不同的 application.id
。