spring cloud stream和kafka的功能模型,如果发生异常,如何发送到另一个主题(错误主题)?
In functional model of spring cloud stream and kafka, how can I send to another topic( error topic) in case of an exception occured?
下面是该功能的一个片段,请根据是否有错误建议如何将数据发送到不同的主题
public Function<KStream<String,?>, KStream<String,?>> process(){
return input -> input.map(key, value) {
try{
// logic of function here
}catch(Exception e) {
// How do I send to different topic from here??
}
return new KeyValue<>(key,value);
}
}
将 kafka 消费者绑定的 enableDlq
选项设置为 true;当侦听器抛出异常时,重试次数耗尽后,记录将发送到死信主题。如果您想立即失败,请将消费者绑定的 maxAttempts
属性 设置为 1(默认值为 3)。
enableDlq
When set to true, it enables DLQ behavior for the consumer. By default, messages that result in errors are forwarded to a topic named error.<destination>.<group>
. The DLQ topic name can be configurable by setting the dlqName
property or by defining a @Bean of type DlqDestinationResolver
. This provides an alternative option to the more common Kafka replay scenario for the case when the number of errors is relatively small and replaying the entire original topic may be too cumbersome. See Dead-Letter Topic Processing processing for more information. Starting with version 2.0, messages sent to the DLQ topic are enhanced with the following headers: x-original-topic, x-exception-message, and x-exception-stacktrace as byte[]
. By default, a failed record is sent to the same partition number in the DLQ topic as the original record. See Dead-Letter Topic Partition Selection for how to change that behavior. Not allowed when destinationIsPattern
is true.
Default: false.
下面是该功能的一个片段,请根据是否有错误建议如何将数据发送到不同的主题
public Function<KStream<String,?>, KStream<String,?>> process(){
return input -> input.map(key, value) {
try{
// logic of function here
}catch(Exception e) {
// How do I send to different topic from here??
}
return new KeyValue<>(key,value);
}
}
将 kafka 消费者绑定的 enableDlq
选项设置为 true;当侦听器抛出异常时,重试次数耗尽后,记录将发送到死信主题。如果您想立即失败,请将消费者绑定的 maxAttempts
属性 设置为 1(默认值为 3)。
enableDlq
When set to true, it enables DLQ behavior for the consumer. By default, messages that result in errors are forwarded to a topic named
error.<destination>.<group>
. The DLQ topic name can be configurable by setting thedlqName
property or by defining a @Bean of typeDlqDestinationResolver
. This provides an alternative option to the more common Kafka replay scenario for the case when the number of errors is relatively small and replaying the entire original topic may be too cumbersome. See Dead-Letter Topic Processing processing for more information. Starting with version 2.0, messages sent to the DLQ topic are enhanced with the following headers:x-original-topic, x-exception-message, and x-exception-stacktrace as byte[]
. By default, a failed record is sent to the same partition number in the DLQ topic as the original record. See Dead-Letter Topic Partition Selection for how to change that behavior. Not allowed whendestinationIsPattern
is true.
Default: false.