如何根据条件向 2 个 kafka 主题发布消息 - spring 云流
How to publish message to 2 kafka topics based on condition - spring cloud stream
目前我有一个 spring clound 函数,它使用一个主题并发布到另一个主题。但对于特定情况,我需要将消息发布到另一个主题。基本上需要从 spring 云功能向多个主题发布消息。
当前代码片段
@Bean
public Function<Message<InputMessage>, Message<OutputMessage>>
messageTransformer(){
return new KafkaTransformer();
}
public class KafkaTransformer
implements Function<
Message<InputMessage>, Message<OutputMessage>> {
@Override
public Message<OutputMessage> apply(
Message<InputMessage> inputMessage) {
try {
Message<OutputMessage> outputMessage = process(inputMessage);
return outputMessage;
} catch (Exception e) {
// need to send message to another topic ( which is other than dlq).
}
}
}
spring.cloud.stream.bindings.messageTransformer-in-0.destination=input.topic
spring.cloud.stream.bindings.messageTransformer-out-0.destination=output.topic
spring.cloud.function.definition=messageTransformer
您是否考虑过为此使用 StreamBridge
API?听起来应该可以满足您的需求。这是 docs.
目前我有一个 spring clound 函数,它使用一个主题并发布到另一个主题。但对于特定情况,我需要将消息发布到另一个主题。基本上需要从 spring 云功能向多个主题发布消息。
当前代码片段
@Bean
public Function<Message<InputMessage>, Message<OutputMessage>>
messageTransformer(){
return new KafkaTransformer();
}
public class KafkaTransformer
implements Function<
Message<InputMessage>, Message<OutputMessage>> {
@Override
public Message<OutputMessage> apply(
Message<InputMessage> inputMessage) {
try {
Message<OutputMessage> outputMessage = process(inputMessage);
return outputMessage;
} catch (Exception e) {
// need to send message to another topic ( which is other than dlq).
}
}
}
spring.cloud.stream.bindings.messageTransformer-in-0.destination=input.topic
spring.cloud.stream.bindings.messageTransformer-out-0.destination=output.topic
spring.cloud.function.definition=messageTransformer
您是否考虑过为此使用 StreamBridge
API?听起来应该可以满足您的需求。这是 docs.