如何在运行时在 Http 出站网关内更改轮询器频率?

How to change poller frequency at runtime within a Http outbound gateway?

我的场景是一个 Http 出站网关,我在其中请求外部服务进行由 TransferRequest 实体表示的下一个转换。网关是“httpOutRequest”通道的端点。 “httpOutRequest”通道的起点是一个 bean IntegrationFlow source(),我在其中发送由轮询器触发的空字符串消息。 (顺便说一句:这有必要吗?我可以将轮询器直接添加到出站网关吗?怎么做?)

然后我安装了 errorHandler 通道端点来捕获任何问题。如果问题(异常)的数量是 MAX_COUNT_TO_REDUCE_POLLING - 假设是因为无法访问外部服务 - 那么我想在运行时将轮询从最初的 5_000 减少到 60_000。

到目前为止,这是我的代码:

    public static final int MAX_COUNT_TO_REDUCE_POLLING = 3;

    private long period = 5000;
    private int problemCounter = 0;

    @Bean
    public IntegrationFlow outbound() {
        return IntegrationFlows.from("httpOutRequest")
                .handle(Http.outboundGateway("http://localhost:8080/harry-potter-service/next/request")
                        .httpMethod(HttpMethod.GET)
                        .expectedResponseType(TransferRequest.class)
                        )
                .channel("reply")
                .get();
    }

    @Bean
    public IntegrationFlow source() {
        return IntegrationFlows.from(
                () -> new GenericMessage<String>(""),
                        e -> e.poller(p -> p.fixedRate(period)))
                .channel("httpOutRequest")
                .get();
    }

    @Bean
    @ServiceActivator(inputChannel = "reply")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println("myHandler: " + message.getPayload());
                System.out.println("myHandler: " + message.getHeaders());
                TransferRequest req = (TransferRequest) message.getPayload();
                System.out.println("myHandler: " + req);
            }
        };
    }

    @Bean
    @ServiceActivator(inputChannel = "errorChannel")
    public MessageHandler errorHandler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                LOG.error("message.payload: " + message.getPayload());
                MessageHandlingException e = (MessageHandlingException) message.getPayload();
                LOG.error("Exception: " + e);
                LOG.debug("exception counter = " + (++problemCounter));

                if (problemCounter >= MAX_COUNT_TO_REDUCE_POLLING) {
                    LOG.debug("would like to reduce poller frequence or stop");
                    period = 60_000;
                //  outbound().stop()
                }
            }
        };
    }

当遇到异常阈值时,如何在运行时降低轮询频率?

我怎么能停止集成流程?

编辑 1

更具体:如果我有消息传递网关

@Bean
public IntegrationFlow source() {
    return IntegrationFlows.from(
            () -> new GenericMessage<String>(""),
                    e -> e.poller(p -> p.fixedRate(period)))
            .channel("httpOutRequest")
            .get();
}

如何访问第二个 Lambda 中的 p?

如何使用 Control Channel 设置 p.fixedRate

我可能已经自己解决了这个问题,阅读手册。

请参阅 here 以在运行时更改轮询速率。为此,您必须使用 org.springframework.integration.util 包中的 DynamicPeriodiyTimer

要替换您的延迟轮询器,请执行:

    private final DynamicPeriodicTrigger dynamicPeriodicTrigger =
            new DynamicPeriodicTrigger(5_000);

    @Bean
    public IntegrationFlow normalStateEntryPoint() {
        return IntegrationFlows.from(
                () -> new GenericMessage<String>(""),
                        e -> e.poller(p -> p.trigger(dynamicPeriodicTrigger))
                        .id("normalStateSourcePollingChannelAdapter")
                        .autoStartup(true))
                .channel("httpOutRequest")
                .get();
    }

要将轮询时间从 5.000 毫秒减少到 60.000 毫秒,请执行以下操作:

    dynamicPeriodicTrigger.setPeriod(60_000);

就是这样。