为什么 "orderly shutdown" 不停止可轮询消息源?
why "orderly shutdown" doesn't stop pollable message sources?
我很难理解为什么 IntegrationMBeanExporter.stopActiveComponents 方法不能像下面这样停止轮询器。
在调用 stopActiveComponents 后不停止 below integrationflow 产生新消息的理由是什么?
(当然我可以自己手动关闭那个轮询器,但为什么框架不处理它?)
有什么方法可以修改下面的代码,以便轮询器在调用 stopActiveComponents 方法后自动停止发送新消息?
IntegrationFlows
.from(
() -> MessageBuilder.<Object>withPayload("").build(),
c -> c.poller(Pollers.fixedDelay(Duration.ofSeconds(10)))
)
.transform(p -> p)
.log(Level.INFO, m-> "done")
.get()
)
非常感谢您抽出宝贵的时间和专业知识。
最好的问候
.from(
() -> MessageBuilder.<Object>withPayload("").build(),
在目标 SourcePollingChannelAdapter
中生成普通 MessageSource
:
SourcePollingChannelAdapter spca = new SourcePollingChannelAdapter();
spca.setSource(this.source);
(参见 SourcePollingChannelAdapterFactoryBean
)。
IntegrationMBeanExporter
反过来期望:
else if (bean instanceof SourcePollingChannelAdapter) {
SourcePollingChannelAdapter pollingChannelAdapter = (SourcePollingChannelAdapter) bean;
MessageSource<?> messageSource = pollingChannelAdapter.getMessageSource();
if (messageSource instanceof IntegrationInboundManagement) {
IntegrationInboundManagement monitor = (IntegrationInboundManagement) extractTarget(messageSource);
registerSource(monitor);
this.sourceLifecycles.put(monitor, pollingChannelAdapter);
this.runtimeBeans.add(monitor);
return;
}
}
因此,只有 IntegrationInboundManagement
impl 被认为是最终停止的生命周期候选者。
要修复您的代码,只需执行以下操作:
.fromSupplier(
() -> "",
c -> c.poller(Pollers.fixedDelay(Duration.ofSeconds(10)))
)
框架将为您进行适当的包装!
我很难理解为什么 IntegrationMBeanExporter.stopActiveComponents 方法不能像下面这样停止轮询器。
在调用 stopActiveComponents 后不停止 below integrationflow 产生新消息的理由是什么?
(当然我可以自己手动关闭那个轮询器,但为什么框架不处理它?)
有什么方法可以修改下面的代码,以便轮询器在调用 stopActiveComponents 方法后自动停止发送新消息?
IntegrationFlows
.from(
() -> MessageBuilder.<Object>withPayload("").build(),
c -> c.poller(Pollers.fixedDelay(Duration.ofSeconds(10)))
)
.transform(p -> p)
.log(Level.INFO, m-> "done")
.get()
)
非常感谢您抽出宝贵的时间和专业知识。
最好的问候
.from(
() -> MessageBuilder.<Object>withPayload("").build(),
在目标 SourcePollingChannelAdapter
中生成普通 MessageSource
:
SourcePollingChannelAdapter spca = new SourcePollingChannelAdapter();
spca.setSource(this.source);
(参见 SourcePollingChannelAdapterFactoryBean
)。
IntegrationMBeanExporter
反过来期望:
else if (bean instanceof SourcePollingChannelAdapter) {
SourcePollingChannelAdapter pollingChannelAdapter = (SourcePollingChannelAdapter) bean;
MessageSource<?> messageSource = pollingChannelAdapter.getMessageSource();
if (messageSource instanceof IntegrationInboundManagement) {
IntegrationInboundManagement monitor = (IntegrationInboundManagement) extractTarget(messageSource);
registerSource(monitor);
this.sourceLifecycles.put(monitor, pollingChannelAdapter);
this.runtimeBeans.add(monitor);
return;
}
}
因此,只有 IntegrationInboundManagement
impl 被认为是最终停止的生命周期候选者。
要修复您的代码,只需执行以下操作:
.fromSupplier(
() -> "",
c -> c.poller(Pollers.fixedDelay(Duration.ofSeconds(10)))
)
框架将为您进行适当的包装!