使用 Jms Pub/Sub 获取 "Dispatcher has no subscribers for jms-channel" 以及使用 DSL 的 Spring 集成
Getting "Dispatcher has no subscribers for jms-channel" using Jms Pub/Sub with Spring Integration using DSL
我根据 Java DSL 文档为 Spring 集成编写了代码。当我 运行 代码时,我不断收到以下警告,即使日志还表明消息已成功传递给两个订阅者。
2021-03-04 17:21:25.589 WARN 46929 --- [enerContainer-1] bleJmsChannel$DispatchingMessageListener : Dispatcher has no subscribers for jms-channel 'application.jmsPublishSubscribeChannel'.
org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:153) ~[spring-integration-core-5.4.3.jar:5.4.3]
at org.springframework.integration.jms.SubscribableJmsChannel$DispatchingMessageListener.onMessage(SubscribableJmsChannel.java:229) ~[spring-integration-jms-5.4.4.jar:5.4.4]
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:761) ~[spring-jms-5.3.3.jar:5.3.3]
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:699) ~[spring-jms-5.3.3.jar:5.3.3]
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674) ~[spring-jms-5.3.3.jar:5.3.3]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318) ~[spring-jms-5.3.3.jar:5.3.3]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257) ~[spring-jms-5.3.3.jar:5.3.3]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189) ~[spring-jms-5.3.3.jar:5.3.3]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179) ~[spring-jms-5.3.3.jar:5.3.3]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076) ~[spring-jms-5.3.3.jar:5.3.3]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
2021-03-04 17:21:26.598 INFO 46929 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : GenericMessage [payload={"Greeting":"Hello from Node at Thu Mar 04 2021 17:21:25 GMT+0000 (Greenwich Mean Time)"}, headers={id=50ab26ae-7a3d-8a2e-f694-94928b5097d6, timestamp=1614878485580}]
我的应用程序中有 @EnableIntegration
,我的发布/子组件如下所示:
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.BroadcastCapableChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.jms.dsl.Jms;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import javax.jms.ConnectionFactory;
import java.util.concurrent.TimeUnit;
@Component
public class MessageFlowPub {
protected final Log logger = LogFactory.getLog(getClass());
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel() {
return Jms.publishSubscribeChannel(connectionFactory)
.destination("dev/")
.get();
}
@Bean
public IntegrationFlow pubSubFlow() {
return f -> f
.publishSubscribeChannel(jmsPublishSubscribeChannel(),
pubsub -> pubsub
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel1")))
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel2"))))
.log()
.handle(System.out::println)
;
}
@Bean
public IntegrationFlow msgHandler1() {
return IntegrationFlows.from("jmsPubSubBridgeChannel1")
.bridge(e -> e.poller(Pollers.fixedRate(1, TimeUnit.SECONDS, 20)))
.log()
.handle(System.out::println)
.get();
}
@Bean
public IntegrationFlow msgHandler2() {
return IntegrationFlows.from("jmsPubSubBridgeChannel2")
.bridge(e -> e.poller(Pollers.fixedRate(1, TimeUnit.SECONDS, 20)))
.log()
.handle(System.out::println)
.get();
}
}
警告消息让我认为我在这里做的事情根本上是错误的,但我看不出是什么。
感谢@ArtemBilan,我通过删除 BroadcastCapableChannel
的 @Bean
摆脱了警告
// @Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel() {
return Jms.publishSubscribeChannel(connectionFactory)
.destination("dev/")
.get();
}
}
好的,我找到问题所在了
您使用了 @Component
和 @Bean
方法组合。此外,您尝试从一个 bean 方法调用另一个 bean 方法:.publishSubscribeChannel(jmsPublishSubscribeChannel()
。这在 @Configuration
class 之外是不可能的。带有 @Component
的注解配置被认为是“轻量级”的,因此我们不能相互调用 bean 方法——它们只是没有被代理以通过方法调用提供适当的依赖注入。
这个应该也适合你:
@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel() {
return Jms.publishSubscribeChannel(connectionFactory)
.destination("dev/")
.get();
}
@Bean
public IntegrationFlow pubSubFlow(BroadcastCapableChannel jmsPublishSubscribeChannel) {
return f -> f
.publishSubscribeChannel(jmsPublishSubscribeChannel,
请阅读有关轻量级配置的更多信息和 proxyBeanMethods = false
:
https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-java-basic-concepts
我根据 Java DSL 文档为 Spring 集成编写了代码。当我 运行 代码时,我不断收到以下警告,即使日志还表明消息已成功传递给两个订阅者。
2021-03-04 17:21:25.589 WARN 46929 --- [enerContainer-1] bleJmsChannel$DispatchingMessageListener : Dispatcher has no subscribers for jms-channel 'application.jmsPublishSubscribeChannel'.
org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:153) ~[spring-integration-core-5.4.3.jar:5.4.3]
at org.springframework.integration.jms.SubscribableJmsChannel$DispatchingMessageListener.onMessage(SubscribableJmsChannel.java:229) ~[spring-integration-jms-5.4.4.jar:5.4.4]
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:761) ~[spring-jms-5.3.3.jar:5.3.3]
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:699) ~[spring-jms-5.3.3.jar:5.3.3]
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674) ~[spring-jms-5.3.3.jar:5.3.3]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318) ~[spring-jms-5.3.3.jar:5.3.3]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257) ~[spring-jms-5.3.3.jar:5.3.3]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189) ~[spring-jms-5.3.3.jar:5.3.3]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179) ~[spring-jms-5.3.3.jar:5.3.3]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076) ~[spring-jms-5.3.3.jar:5.3.3]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
2021-03-04 17:21:26.598 INFO 46929 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : GenericMessage [payload={"Greeting":"Hello from Node at Thu Mar 04 2021 17:21:25 GMT+0000 (Greenwich Mean Time)"}, headers={id=50ab26ae-7a3d-8a2e-f694-94928b5097d6, timestamp=1614878485580}]
我的应用程序中有 @EnableIntegration
,我的发布/子组件如下所示:
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.BroadcastCapableChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.jms.dsl.Jms;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import javax.jms.ConnectionFactory;
import java.util.concurrent.TimeUnit;
@Component
public class MessageFlowPub {
protected final Log logger = LogFactory.getLog(getClass());
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel() {
return Jms.publishSubscribeChannel(connectionFactory)
.destination("dev/")
.get();
}
@Bean
public IntegrationFlow pubSubFlow() {
return f -> f
.publishSubscribeChannel(jmsPublishSubscribeChannel(),
pubsub -> pubsub
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel1")))
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel2"))))
.log()
.handle(System.out::println)
;
}
@Bean
public IntegrationFlow msgHandler1() {
return IntegrationFlows.from("jmsPubSubBridgeChannel1")
.bridge(e -> e.poller(Pollers.fixedRate(1, TimeUnit.SECONDS, 20)))
.log()
.handle(System.out::println)
.get();
}
@Bean
public IntegrationFlow msgHandler2() {
return IntegrationFlows.from("jmsPubSubBridgeChannel2")
.bridge(e -> e.poller(Pollers.fixedRate(1, TimeUnit.SECONDS, 20)))
.log()
.handle(System.out::println)
.get();
}
}
警告消息让我认为我在这里做的事情根本上是错误的,但我看不出是什么。
感谢@ArtemBilan,我通过删除 BroadcastCapableChannel
@Bean
摆脱了警告
// @Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel() {
return Jms.publishSubscribeChannel(connectionFactory)
.destination("dev/")
.get();
}
}
好的,我找到问题所在了
您使用了 @Component
和 @Bean
方法组合。此外,您尝试从一个 bean 方法调用另一个 bean 方法:.publishSubscribeChannel(jmsPublishSubscribeChannel()
。这在 @Configuration
class 之外是不可能的。带有 @Component
的注解配置被认为是“轻量级”的,因此我们不能相互调用 bean 方法——它们只是没有被代理以通过方法调用提供适当的依赖注入。
这个应该也适合你:
@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel() {
return Jms.publishSubscribeChannel(connectionFactory)
.destination("dev/")
.get();
}
@Bean
public IntegrationFlow pubSubFlow(BroadcastCapableChannel jmsPublishSubscribeChannel) {
return f -> f
.publishSubscribeChannel(jmsPublishSubscribeChannel,
请阅读有关轻量级配置的更多信息和 proxyBeanMethods = false
:
https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-java-basic-concepts