发布-订阅通道都转到 Kafka 导致重复的 KafkaProducerContexts
Publish-Subscribe Channels Both Going to Kafka Result in Duplicate KafkaProducerContexts
我正在尝试使用 Spring 集成将数据从一个通道发送到两个不同的 Kafka 队列,这些相同的数据在到达各自队列的途中经过不同的转换。问题是我显然有重复的生产者上下文,我不知道为什么。
这是我的流程配置:
flow -> flow
.channel(“firstChannel")
.publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
.subscribe(f -> f
.transform(firstTransformer::transform)
.channel(MessageChannels.queue(50))
.handle(Kafka.outboundChannelAdapter(kafkaConfig)
.addProducer(firstMetadata(), brokerAddress), e -> e.id(“firstKafkaOutboundChannelAdapter")
.autoStartup(true)
.poller(p -> p.fixedDelay(1000, TimeUnit.MILLISECONDS).receiveTimeout(0).taskExecutor(taskExecutor))
.get())
)
.subscribe(f -> f
.transform(secondTransformer::transform)
.channel(MessageChannels.queue(50))
.handle(Kafka.outboundChannelAdapter(kafkaConfig)
.addProducer(secondMetadata(), brokerAddress), e -> e.id(“secondKafkaOutboundChannelAdapter")
.autoStartup(true)
.poller(p -> p.fixedDelay(1000, TimeUnit.MILLISECONDS).receiveTimeout(0).taskExecutor(taskExecutor))
.get())
));
例外情况是:
无法在 bean 名称 'not_specified' 下注册对象 [org.springframework.integration.kafka.support.KafkaProducerContext@3163987e]:已经绑定了对象 [org.springframework.integration.kafka.support.KafkaProducerContext@15f193b8]
我曾尝试使用不同的 kafkaConfig
对象,但这没有帮助。同时,ProducerMetadata
实例是不同的,您可以从 addProducer
的不同第一个参数中看出。这些提供了其他元数据中各个目标队列的名称。
听起来有些正在创建的隐式 bean 定义相互冲突。
如何用两个 KafkaProducerContext
解决这个异常?
你不应该在那些 KafkaProducerMessageHandlerSpec
上使用 .get()
并让 Framework 为你解决环境。
这个问题是因为 KafkaProducerMessageHandlerSpec implements ComponentsRegistration
并且没有人关心:
public Collection<Object> getComponentsToRegister() {
this.kafkaProducerContext.setProducerConfigurations(this.producerConfigurations);
return Collections.<Object>singleton(this.kafkaProducerContext);
}
手动 .get()
调用后。
我同意,这会带来一些不便,我们应该为 end-application 找到更好的解决方案,但目前还没有选择,除非遵循框架组件的 Spec
样式,例如 Kafka.outboundChannelAdapter()
.
希望我清楚。
更新
好的,这绝对是我们这边的问题。我们会尽快修复它:
https://jira.spring.io/browse/INTEXT-216
https://jira.spring.io/browse/INTEXT-217
同时,您的解决方法如下:
KafkaProducerContext kafkaProducerContext = (KafkaProducerContext) kafkaProducerMessageHandlerSpec.getComponentsToRegister().iterator().next();
kafkaProducerContext.setBeanName(null);
你应该搬到哪里去
Kafka.outboundChannelAdapter(kafkaConfig)
.addProducer(firstMetadata(), brokerAddress)
到单独的 private
方法来访问 kafkaProducerContext
。
我正在尝试使用 Spring 集成将数据从一个通道发送到两个不同的 Kafka 队列,这些相同的数据在到达各自队列的途中经过不同的转换。问题是我显然有重复的生产者上下文,我不知道为什么。
这是我的流程配置:
flow -> flow
.channel(“firstChannel")
.publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
.subscribe(f -> f
.transform(firstTransformer::transform)
.channel(MessageChannels.queue(50))
.handle(Kafka.outboundChannelAdapter(kafkaConfig)
.addProducer(firstMetadata(), brokerAddress), e -> e.id(“firstKafkaOutboundChannelAdapter")
.autoStartup(true)
.poller(p -> p.fixedDelay(1000, TimeUnit.MILLISECONDS).receiveTimeout(0).taskExecutor(taskExecutor))
.get())
)
.subscribe(f -> f
.transform(secondTransformer::transform)
.channel(MessageChannels.queue(50))
.handle(Kafka.outboundChannelAdapter(kafkaConfig)
.addProducer(secondMetadata(), brokerAddress), e -> e.id(“secondKafkaOutboundChannelAdapter")
.autoStartup(true)
.poller(p -> p.fixedDelay(1000, TimeUnit.MILLISECONDS).receiveTimeout(0).taskExecutor(taskExecutor))
.get())
));
例外情况是:
无法在 bean 名称 'not_specified' 下注册对象 [org.springframework.integration.kafka.support.KafkaProducerContext@3163987e]:已经绑定了对象 [org.springframework.integration.kafka.support.KafkaProducerContext@15f193b8]
我曾尝试使用不同的 kafkaConfig
对象,但这没有帮助。同时,ProducerMetadata
实例是不同的,您可以从 addProducer
的不同第一个参数中看出。这些提供了其他元数据中各个目标队列的名称。
听起来有些正在创建的隐式 bean 定义相互冲突。
如何用两个 KafkaProducerContext
解决这个异常?
你不应该在那些 KafkaProducerMessageHandlerSpec
上使用 .get()
并让 Framework 为你解决环境。
这个问题是因为 KafkaProducerMessageHandlerSpec implements ComponentsRegistration
并且没有人关心:
public Collection<Object> getComponentsToRegister() {
this.kafkaProducerContext.setProducerConfigurations(this.producerConfigurations);
return Collections.<Object>singleton(this.kafkaProducerContext);
}
手动 .get()
调用后。
我同意,这会带来一些不便,我们应该为 end-application 找到更好的解决方案,但目前还没有选择,除非遵循框架组件的 Spec
样式,例如 Kafka.outboundChannelAdapter()
.
希望我清楚。
更新
好的,这绝对是我们这边的问题。我们会尽快修复它: https://jira.spring.io/browse/INTEXT-216 https://jira.spring.io/browse/INTEXT-217
同时,您的解决方法如下:
KafkaProducerContext kafkaProducerContext = (KafkaProducerContext) kafkaProducerMessageHandlerSpec.getComponentsToRegister().iterator().next();
kafkaProducerContext.setBeanName(null);
你应该搬到哪里去
Kafka.outboundChannelAdapter(kafkaConfig)
.addProducer(firstMetadata(), brokerAddress)
到单独的 private
方法来访问 kafkaProducerContext
。