使用 spring 集成 Kafka 适配器时 spring kafka 的动态主题名称配置?
Dynamic topic name configuration for spring kafka when using spring integration Kafka adapters?
我有 spring 个集成流程,我需要再次使用它。
@Bean
public IntegrationFlow sendToKafkaFlowRequest(@Value("${kafka.document-consume-topic}") String topic,
ProducerFactory<?, Message> producerFactory) {
return IntegrationFlows.from("kafkaRequestChannel")
.handle(Kafka
.outboundChannelAdapter(producerFactory)
.messageKey(m -> m
.getHeaders()
.get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
.topicExpression("headers[kafka_topic] ?: '" + topic + "'"))
.get();
}
@Bean
public IntegrationFlow listeningFromKafkaFlow(@Value("${kafka.document-consume-topic}") String topic,
ConsumerFactory<?, Message> consumerFactory) {
return IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(consumerFactory, ListenerMode.record, topic)
.configureListenerContainer(c -> c.ackMode(AbstractMessageListenerContainer.AckMode.RECORD))
.retryTemplate(new RetryTemplate())
.filterInRetry(true))
.channel("interMessageChannel")
.get();
}
我想为多个主题一次又一次地重用这两个流程。但问题是主题是硬编码的。
问题是我们可以使用消息的 headers 来放置主题名称吗?这会是一个问题吗 ?
好的。看,Kafka.messageDrivenChannelAdapter()
可以接受多个主题,因此您只需要一个流程即可。
是的,你总是可以在通过Kafka.outboundChannelAdapter()
发送到Kafka之前在消息中设置kafka_topic
头。再说一遍:您也不需要在此处复制 IntegrationFlow
。仅发送一条针对主题解析的消息就足够了。
我有 spring 个集成流程,我需要再次使用它。
@Bean
public IntegrationFlow sendToKafkaFlowRequest(@Value("${kafka.document-consume-topic}") String topic,
ProducerFactory<?, Message> producerFactory) {
return IntegrationFlows.from("kafkaRequestChannel")
.handle(Kafka
.outboundChannelAdapter(producerFactory)
.messageKey(m -> m
.getHeaders()
.get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
.topicExpression("headers[kafka_topic] ?: '" + topic + "'"))
.get();
}
@Bean
public IntegrationFlow listeningFromKafkaFlow(@Value("${kafka.document-consume-topic}") String topic,
ConsumerFactory<?, Message> consumerFactory) {
return IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(consumerFactory, ListenerMode.record, topic)
.configureListenerContainer(c -> c.ackMode(AbstractMessageListenerContainer.AckMode.RECORD))
.retryTemplate(new RetryTemplate())
.filterInRetry(true))
.channel("interMessageChannel")
.get();
}
我想为多个主题一次又一次地重用这两个流程。但问题是主题是硬编码的。 问题是我们可以使用消息的 headers 来放置主题名称吗?这会是一个问题吗 ?
好的。看,Kafka.messageDrivenChannelAdapter()
可以接受多个主题,因此您只需要一个流程即可。
是的,你总是可以在通过Kafka.outboundChannelAdapter()
发送到Kafka之前在消息中设置kafka_topic
头。再说一遍:您也不需要在此处复制 IntegrationFlow
。仅发送一条针对主题解析的消息就足够了。