延迟入站适配器和控制总线
Delayed Inbound Adapter and Control Bus
我的集成流程代码是:
@Bean
public IntegrationFlow messageFlow() {
return IntegrationFlows.from(stompInboundChannelAdapter())
.transform(inBoundStompMsgTransformer::transform)
.headerFilter("stomp_subscription","content-length")
.handle(Amqp.outboundAdapter(outboundConfiguration.rabbitTemplate()))
.get();
}
我正在使用 Spring 启动。
清除日志指出 {transformer}
订阅者已添加到输入频道
2019-12-09 18:21:41.752 INFO 18248 --- [ main] o.s.i.s.i.StompInboundChannelAdapter : started bean 'stompInboundChannelAdapter'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@21e360a'
2019-12-09 18:21:41.768 INFO 18248 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {transformer} as a subscriber to the 'stompInputChannel' channel
2019-12-09 18:21:41.768 INFO 18248 --- [ main] o.s.integration.channel.DirectChannel : Channel 'application.stompInputChannel' has 1 subscriber(s).
2019-12-09 18:21:41.768 INFO 18248 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean 'inboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'bean method inboundFlow'
2019-12-09 18:21:41.768 INFO 18248 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {header-filter} as a subscriber to the 'inboundFlow.channel#0' channel
2019-12-09 18:21:41.772 INFO 18248 --- [ main] o.s.integration.channel.DirectChannel : Channel 'application.inboundFlow.channel#0' has 1 subscriber(s).
2019-12-09 18:21:41.772 INFO 18248 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean 'inboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'bean method inboundFlow'
2019-12-09 18:21:41.772 INFO 18248 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {amqp:outbound-channel-adapter} as a subscriber to the 'inboundFlow.channel#1' channel
2019-12-09 18:21:41.772 INFO 18248 --- [ main] o.s.integration.channel.DirectChannel : Channel 'application.inboundFlow.channel#1' has 1 subscriber(s).
2019-12-09 18:21:41.772 INFO 18248 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean 'inboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#2'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'bean method inboundFlow'
但是,我得到一个异常,我丢失了队列中的第一个 one/two 消息。它处理剩余的消息。
假设在启动应用程序之前队列中有 10 条消息。启动应用程序后,即使日志显示订阅者已添加且 bean 已启动,我仍收到异常,post 异常,处理了 8/9 条消息。
例外情况是:org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.stompInputChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage
很明显,上下文还没有完全准备好处理消息,因此出现了异常。 但日志消息具有误导性。
我的第一个问题:
- 那么添加订阅者并启动 bean 到底意味着什么?这是否意味着一切都已设置,但上下文仍必须准备好处理消息?
为了克服这个问题,正如许多 post 中所建议的那样,我使用控制总线来启动适配器。代码是:
......
@Component
public class ApplicationLifeCycle implements SmartLifecycle {
@Autowired
private MessageChannel controlBusChannel;
@Override
public void start() {
System.out.println("Service starting...");
controlBusChannel.send(new GenericMessage<>("@stompInboundChannelAdapter.start()"));
}
.....
我创建了 public class ApplicationLifeCycle implements SmartLifecycle
认为它会很方便。
我的第二个问题是:
- 这是使用控制总线处理 start/stop 适配器的 right/best 方式吗?如果方法不对,请告诉我正确的方法。
谢谢,
马赫什
我认为这是你另一个问题的延续:IntegrationFlow Amqp Channel Adapter is not working in handle()
你有这个:
@Bean
public StompInboundChannelAdapter stompInboundChannelAdapter() {
StompInboundChannelAdapter adapter = new StompInboundChannelAdapter(stompSessionManager(), "/queue/myQueue");
adapter.setOutputChannel(stompInputChannel());
adapter.setPayloadType(ByteString.class);
return adapter;
}
您没有在此处显示。
问题是您随后在 IntegrationFlow
中使用了相同的定义。事实证明,StompInboundChannelAdapter
bean 较早启动,然后处理 IntegationFlow
,并订阅 .transform(inBoundStompMsgTransformer::transform)
以处理传入消息。
因此,如果您从 stompInboundChannelAdapter()
中删除 @Bean
,它应该可以正常工作。待会儿再看看为什么MessageProducerSupport
早点开始,然后IntegrationFlow
s...
我的集成流程代码是:
@Bean
public IntegrationFlow messageFlow() {
return IntegrationFlows.from(stompInboundChannelAdapter())
.transform(inBoundStompMsgTransformer::transform)
.headerFilter("stomp_subscription","content-length")
.handle(Amqp.outboundAdapter(outboundConfiguration.rabbitTemplate()))
.get();
}
我正在使用 Spring 启动。
清除日志指出 {transformer}
订阅者已添加到输入频道
2019-12-09 18:21:41.752 INFO 18248 --- [ main] o.s.i.s.i.StompInboundChannelAdapter : started bean 'stompInboundChannelAdapter'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@21e360a'
2019-12-09 18:21:41.768 INFO 18248 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {transformer} as a subscriber to the 'stompInputChannel' channel
2019-12-09 18:21:41.768 INFO 18248 --- [ main] o.s.integration.channel.DirectChannel : Channel 'application.stompInputChannel' has 1 subscriber(s).
2019-12-09 18:21:41.768 INFO 18248 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean 'inboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'bean method inboundFlow'
2019-12-09 18:21:41.768 INFO 18248 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {header-filter} as a subscriber to the 'inboundFlow.channel#0' channel
2019-12-09 18:21:41.772 INFO 18248 --- [ main] o.s.integration.channel.DirectChannel : Channel 'application.inboundFlow.channel#0' has 1 subscriber(s).
2019-12-09 18:21:41.772 INFO 18248 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean 'inboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'bean method inboundFlow'
2019-12-09 18:21:41.772 INFO 18248 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {amqp:outbound-channel-adapter} as a subscriber to the 'inboundFlow.channel#1' channel
2019-12-09 18:21:41.772 INFO 18248 --- [ main] o.s.integration.channel.DirectChannel : Channel 'application.inboundFlow.channel#1' has 1 subscriber(s).
2019-12-09 18:21:41.772 INFO 18248 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean 'inboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#2'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'bean method inboundFlow'
但是,我得到一个异常,我丢失了队列中的第一个 one/two 消息。它处理剩余的消息。
假设在启动应用程序之前队列中有 10 条消息。启动应用程序后,即使日志显示订阅者已添加且 bean 已启动,我仍收到异常,post 异常,处理了 8/9 条消息。
例外情况是:org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.stompInputChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage
很明显,上下文还没有完全准备好处理消息,因此出现了异常。 但日志消息具有误导性。
我的第一个问题:
- 那么添加订阅者并启动 bean 到底意味着什么?这是否意味着一切都已设置,但上下文仍必须准备好处理消息?
为了克服这个问题,正如许多 post 中所建议的那样,我使用控制总线来启动适配器。代码是:
......
@Component
public class ApplicationLifeCycle implements SmartLifecycle {
@Autowired
private MessageChannel controlBusChannel;
@Override
public void start() {
System.out.println("Service starting...");
controlBusChannel.send(new GenericMessage<>("@stompInboundChannelAdapter.start()"));
}
.....
我创建了 public class ApplicationLifeCycle implements SmartLifecycle
认为它会很方便。
我的第二个问题是:
- 这是使用控制总线处理 start/stop 适配器的 right/best 方式吗?如果方法不对,请告诉我正确的方法。
谢谢,
马赫什
我认为这是你另一个问题的延续:IntegrationFlow Amqp Channel Adapter is not working in handle()
你有这个:
@Bean
public StompInboundChannelAdapter stompInboundChannelAdapter() {
StompInboundChannelAdapter adapter = new StompInboundChannelAdapter(stompSessionManager(), "/queue/myQueue");
adapter.setOutputChannel(stompInputChannel());
adapter.setPayloadType(ByteString.class);
return adapter;
}
您没有在此处显示。
问题是您随后在 IntegrationFlow
中使用了相同的定义。事实证明,StompInboundChannelAdapter
bean 较早启动,然后处理 IntegationFlow
,并订阅 .transform(inBoundStompMsgTransformer::transform)
以处理传入消息。
因此,如果您从 stompInboundChannelAdapter()
中删除 @Bean
,它应该可以正常工作。待会儿再看看为什么MessageProducerSupport
早点开始,然后IntegrationFlow
s...