Pub/Sub Spring 集成队列通道未定义轮询器
Pub/Sub Spring integration Queue Channel no poller defined
我有一个连接到 pub/sub 并在 pub/sub 订阅发布时处理消息的应用程序。我希望能够将这些消息放入队列通道以避免一次处理大量消息。但是,当我尝试添加队列通道时出现以下错误?所以我看到的方式是,一条消息到达 inboundChannelAdaptor,将消息输出到 Queue 通道,然后 messageReciever 拉取并操作 QueueChannel 中的消息?
java.lang.IllegalArgumentException: No poller has been defined for Annotation-based endpoint, and no default poller is available within the context.
at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.configurePollingEndpoint(AbstractMethodAnnotationPostProcessor.java:435) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.doCreateEndpoint(AbstractMethodAnnotationPostProcessor.java:377) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.createEndpoint(AbstractMethodAnnotationPostProcessor.java:367) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.postProcess(AbstractMethodAnnotationPostProcessor.java:172) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.postProcessMethodAndRegisterEndpointIfAny(MessagingAnnotationPostProcessor.java:230) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.lambda$processAnnotationTypeOnMethod(MessagingAnnotationPostProcessor.java:220) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) ~[na:na]
at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.afterSingletonsInstantiated(MessagingAnnotationPostProcessor.java:141) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:914) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:879) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:551) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at Application.main(Application.java:65) ~[classes/:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) ~[spring-boot-devtools-2.3.3.RELEASE.jar:2.3.3.RELEASE]
这是我的实现:
@Bean
public MessageChannel inputMessageQueueChannel() {
return new QueueChannel(50);
}
@Bean
public PubSubInboundChannelAdapter inboundChannelAdapter(
@Qualifier("inputMessageQueueChannel") MessageChannel messageChannel,
PubSubTemplate pubSubTemplate) {
PubSubInboundChannelAdapter adapter =
new PubSubInboundChannelAdapter(pubSubTemplate,'sub-one');
adapter.setOutputChannel(messageChannel);
adapter.setAckMode(AckMode.MANUAL);
adapter.setPayloadType(String.class);
return adapter;
}
@ServiceActivator(inputChannel = "inputMessageQueueChannel")
public void messageReceiver(
String payload,
@Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
log.info("Payload: " + payload);
message.ack();
}
队列通道本身不执行任何操作,除非它在内部将发送的消息存储到消息存储中。为了能够从该队列接收消息,您需要定期 轮询 这样的队列。为此目的,使用了 PollingConsumer
模式,但它不能自己做任何事情:你需要告诉它如何轮询该队列。因此,必须提供轮询器。请参阅该 @ServiceActivator
注释的 poller
属性。或者您可以通过 PollerMetadata
bean 定义提供一个全局默认值,名称为 PollerMetadata.DEFAULT_POLLER
。在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#endpoint-pollingconsumer.
另请参阅此问题及其答案:Spring Integration No poller has been defined for endpoint
我有一个连接到 pub/sub 并在 pub/sub 订阅发布时处理消息的应用程序。我希望能够将这些消息放入队列通道以避免一次处理大量消息。但是,当我尝试添加队列通道时出现以下错误?所以我看到的方式是,一条消息到达 inboundChannelAdaptor,将消息输出到 Queue 通道,然后 messageReciever 拉取并操作 QueueChannel 中的消息?
java.lang.IllegalArgumentException: No poller has been defined for Annotation-based endpoint, and no default poller is available within the context.
at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.configurePollingEndpoint(AbstractMethodAnnotationPostProcessor.java:435) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.doCreateEndpoint(AbstractMethodAnnotationPostProcessor.java:377) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.createEndpoint(AbstractMethodAnnotationPostProcessor.java:367) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.postProcess(AbstractMethodAnnotationPostProcessor.java:172) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.postProcessMethodAndRegisterEndpointIfAny(MessagingAnnotationPostProcessor.java:230) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.lambda$processAnnotationTypeOnMethod(MessagingAnnotationPostProcessor.java:220) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) ~[na:na]
at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.afterSingletonsInstantiated(MessagingAnnotationPostProcessor.java:141) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:914) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:879) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:551) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at Application.main(Application.java:65) ~[classes/:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) ~[spring-boot-devtools-2.3.3.RELEASE.jar:2.3.3.RELEASE]
这是我的实现:
@Bean
public MessageChannel inputMessageQueueChannel() {
return new QueueChannel(50);
}
@Bean
public PubSubInboundChannelAdapter inboundChannelAdapter(
@Qualifier("inputMessageQueueChannel") MessageChannel messageChannel,
PubSubTemplate pubSubTemplate) {
PubSubInboundChannelAdapter adapter =
new PubSubInboundChannelAdapter(pubSubTemplate,'sub-one');
adapter.setOutputChannel(messageChannel);
adapter.setAckMode(AckMode.MANUAL);
adapter.setPayloadType(String.class);
return adapter;
}
@ServiceActivator(inputChannel = "inputMessageQueueChannel")
public void messageReceiver(
String payload,
@Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
log.info("Payload: " + payload);
message.ack();
}
队列通道本身不执行任何操作,除非它在内部将发送的消息存储到消息存储中。为了能够从该队列接收消息,您需要定期 轮询 这样的队列。为此目的,使用了 PollingConsumer
模式,但它不能自己做任何事情:你需要告诉它如何轮询该队列。因此,必须提供轮询器。请参阅该 @ServiceActivator
注释的 poller
属性。或者您可以通过 PollerMetadata
bean 定义提供一个全局默认值,名称为 PollerMetadata.DEFAULT_POLLER
。在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#endpoint-pollingconsumer.
另请参阅此问题及其答案:Spring Integration No poller has been defined for endpoint