spring cloud stream和kafka的功能模型,如果发生异常,如何发送到另一个主题(错误主题)?

In functional model of spring cloud stream and kafka, how can I send to another topic( error topic) in case of an exception occured?

下面是该功能的一个片段,请根据是否有错误建议如何将数据发送到不同的主题

public Function<KStream<String,?>, KStream<String,?>> process(){
return input -> input.map(key, value) {
try{
// logic of function here 
}catch(Exception e) {
// How do I send to different topic from here??
}
return new KeyValue<>(key,value);
}
}

将 kafka 消费者绑定的 enableDlq 选项设置为 true;当侦听器抛出异常时,重试次数耗尽后,记录将发送到死信主题。如果您想立即失败,请将消费者绑定的 maxAttempts 属性 设置为 1(默认值为 3)。

参见 the documentation

enableDlq

When set to true, it enables DLQ behavior for the consumer. By default, messages that result in errors are forwarded to a topic named error.<destination>.<group>. The DLQ topic name can be configurable by setting the dlqName property or by defining a @Bean of type DlqDestinationResolver. This provides an alternative option to the more common Kafka replay scenario for the case when the number of errors is relatively small and replaying the entire original topic may be too cumbersome. See Dead-Letter Topic Processing processing for more information. Starting with version 2.0, messages sent to the DLQ topic are enhanced with the following headers: x-original-topic, x-exception-message, and x-exception-stacktrace as byte[]. By default, a failed record is sent to the same partition number in the DLQ topic as the original record. See Dead-Letter Topic Partition Selection for how to change that behavior. Not allowed when destinationIsPattern is true.

Default: false.