为什么 "orderly shutdown" 不停止可轮询消息源?

why "orderly shutdown" doesn't stop pollable message sources?

我很难理解为什么 IntegrationMBeanExporter.stopActiveComponents 方法不能像下面这样停止轮询器。
在调用 stopActiveComponents 后不停止 below integrationflow 产生新消息的理由是什么?
(当然我可以自己手动关闭那个轮询器,但为什么框架不处理它?)
有什么方法可以修改下面的代码,以便轮询器在调用 stopActiveComponents 方法后自动停止发送新消息?

IntegrationFlows
    .from(
        () -> MessageBuilder.<Object>withPayload("").build(),
        c -> c.poller(Pollers.fixedDelay(Duration.ofSeconds(10)))
    )
    .transform(p -> p)
    .log(Level.INFO, m-> "done")
    .get()
)

非常感谢您抽出宝贵的时间和专业知识。
最好的问候

.from(
    () -> MessageBuilder.<Object>withPayload("").build(),

在目标 SourcePollingChannelAdapter 中生成普通 MessageSource:

        SourcePollingChannelAdapter spca = new SourcePollingChannelAdapter();
        spca.setSource(this.source);

(参见 SourcePollingChannelAdapterFactoryBean)。

IntegrationMBeanExporter 反过来期望:

else if (bean instanceof SourcePollingChannelAdapter) {
        SourcePollingChannelAdapter pollingChannelAdapter = (SourcePollingChannelAdapter) bean;
        MessageSource<?> messageSource = pollingChannelAdapter.getMessageSource();
        if (messageSource instanceof IntegrationInboundManagement) {
            IntegrationInboundManagement monitor = (IntegrationInboundManagement) extractTarget(messageSource);
            registerSource(monitor);
            this.sourceLifecycles.put(monitor, pollingChannelAdapter);
            this.runtimeBeans.add(monitor);
            return;
        }
    }

因此,只有 IntegrationInboundManagement impl 被认为是最终停止的生命周期候选者。

要修复您的代码,只需执行以下操作:

.fromSupplier(
    () -> "",
    c -> c.poller(Pollers.fixedDelay(Duration.ofSeconds(10)))
)

框架将为您进行适当的包装!