当控制进入 catch 块功能 kafka spring 时如何停止发送到 kafka 主题
How to stop sending to kafka topic when control goes to catch block Functional kafka spring
请问我如何停止发送到我的第 3 个 kafka 主题,当控件到达 catch 块时,当前消息被发送到错误主题以及它应该发送到的主题以防万一的正常处理。一段代码如下:
@Component
public class Abc {
private final StreamBridge streamBridge;
public Abc (StreamBridge streamBridge)
this.streamBridge = streamBridge;
@Bean
public Function<KStream<String, KafkaClass>, KStream<String,KafkaClass>> hiProcess() {
return input -> input.map((key,value) -> {
try{
KafkaClass stream = processFunction();
}
catch(Exception e) {
Message<KakfaClass> mess = MessageBuilder.withPayload(value).build();
streamBridge.send("errProcess-out-0". mess);
}
return new KeyValue<>(key, stream);
})
}
}
这可以使用以下模式实现:
KafkaClass stream;
return input -> input
.branch((k, v) -> {
try {
stream = processFunction();
return true;
}
catch (Exception e) {
Message<KakfaClass> mess = MessageBuilder.withPayload(value).build();
streamBridge.send("errProcess-out-0". mess);
return false;
}
},
(k, v) -> true)[0]
.map((k, v) -> new KeyValue<>(k, stream));
在这里,我们使用 KStream
的分支功能 (API) 将您的输入分成两条路径 - 正常流程和导致错误的流程。这是通过为 branch
方法调用提供两个过滤器来实现的。第一个过滤器是您调用 processFunction
方法并获得响应的正常流程。如果我们没有得到异常,过滤器 returns true
,分支操作的结果被捕获在输出数组 [0]
的第一个元素中,该元素在下游处理map
将最终结果发送到出站主题的操作。
另一方面,如果它抛出异常,它会使用 StreamBridge
和过滤器 returns false
将任何必要的内容发送到错误主题。由于下游 map
操作仅对来自分支 [0]
的数组的第一个元素执行,因此不会向外发送任何内容。当第一个过滤器 returns false
时,它会转到第二个过滤器,它总是 returns true
。这是一个空操作过滤器,结果被完全忽略。
此特定实现的一个缺点是您需要将来自 processFunction
的响应存储在一个实例字段中,然后对每个传入的 KStream
记录进行变异,以便您可以在发送输出的最终 map
方法。但是,对于这个特定的用例,这可能不是问题。
请问我如何停止发送到我的第 3 个 kafka 主题,当控件到达 catch 块时,当前消息被发送到错误主题以及它应该发送到的主题以防万一的正常处理。一段代码如下:
@Component
public class Abc {
private final StreamBridge streamBridge;
public Abc (StreamBridge streamBridge)
this.streamBridge = streamBridge;
@Bean
public Function<KStream<String, KafkaClass>, KStream<String,KafkaClass>> hiProcess() {
return input -> input.map((key,value) -> {
try{
KafkaClass stream = processFunction();
}
catch(Exception e) {
Message<KakfaClass> mess = MessageBuilder.withPayload(value).build();
streamBridge.send("errProcess-out-0". mess);
}
return new KeyValue<>(key, stream);
})
}
}
这可以使用以下模式实现:
KafkaClass stream;
return input -> input
.branch((k, v) -> {
try {
stream = processFunction();
return true;
}
catch (Exception e) {
Message<KakfaClass> mess = MessageBuilder.withPayload(value).build();
streamBridge.send("errProcess-out-0". mess);
return false;
}
},
(k, v) -> true)[0]
.map((k, v) -> new KeyValue<>(k, stream));
在这里,我们使用 KStream
的分支功能 (API) 将您的输入分成两条路径 - 正常流程和导致错误的流程。这是通过为 branch
方法调用提供两个过滤器来实现的。第一个过滤器是您调用 processFunction
方法并获得响应的正常流程。如果我们没有得到异常,过滤器 returns true
,分支操作的结果被捕获在输出数组 [0]
的第一个元素中,该元素在下游处理map
将最终结果发送到出站主题的操作。
另一方面,如果它抛出异常,它会使用 StreamBridge
和过滤器 returns false
将任何必要的内容发送到错误主题。由于下游 map
操作仅对来自分支 [0]
的数组的第一个元素执行,因此不会向外发送任何内容。当第一个过滤器 returns false
时,它会转到第二个过滤器,它总是 returns true
。这是一个空操作过滤器,结果被完全忽略。
此特定实现的一个缺点是您需要将来自 processFunction
的响应存储在一个实例字段中,然后对每个传入的 KStream
记录进行变异,以便您可以在发送输出的最终 map
方法。但是,对于这个特定的用例,这可能不是问题。