如何在运行时在 Http 出站网关内更改轮询器频率?
How to change poller frequency at runtime within a Http outbound gateway?
我的场景是一个 Http 出站网关,我在其中请求外部服务进行由 TransferRequest
实体表示的下一个转换。网关是“httpOutRequest”通道的端点。 “httpOutRequest”通道的起点是一个 bean IntegrationFlow source()
,我在其中发送由轮询器触发的空字符串消息。 (顺便说一句:这有必要吗?我可以将轮询器直接添加到出站网关吗?怎么做?)
然后我安装了 errorHandler 通道端点来捕获任何问题。如果问题(异常)的数量是 MAX_COUNT_TO_REDUCE_POLLING
- 假设是因为无法访问外部服务 - 那么我想在运行时将轮询从最初的 5_000 减少到 60_000。
到目前为止,这是我的代码:
public static final int MAX_COUNT_TO_REDUCE_POLLING = 3;
private long period = 5000;
private int problemCounter = 0;
@Bean
public IntegrationFlow outbound() {
return IntegrationFlows.from("httpOutRequest")
.handle(Http.outboundGateway("http://localhost:8080/harry-potter-service/next/request")
.httpMethod(HttpMethod.GET)
.expectedResponseType(TransferRequest.class)
)
.channel("reply")
.get();
}
@Bean
public IntegrationFlow source() {
return IntegrationFlows.from(
() -> new GenericMessage<String>(""),
e -> e.poller(p -> p.fixedRate(period)))
.channel("httpOutRequest")
.get();
}
@Bean
@ServiceActivator(inputChannel = "reply")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("myHandler: " + message.getPayload());
System.out.println("myHandler: " + message.getHeaders());
TransferRequest req = (TransferRequest) message.getPayload();
System.out.println("myHandler: " + req);
}
};
}
@Bean
@ServiceActivator(inputChannel = "errorChannel")
public MessageHandler errorHandler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
LOG.error("message.payload: " + message.getPayload());
MessageHandlingException e = (MessageHandlingException) message.getPayload();
LOG.error("Exception: " + e);
LOG.debug("exception counter = " + (++problemCounter));
if (problemCounter >= MAX_COUNT_TO_REDUCE_POLLING) {
LOG.debug("would like to reduce poller frequence or stop");
period = 60_000;
// outbound().stop()
}
}
};
}
当遇到异常阈值时,如何在运行时降低轮询频率?
我怎么能停止集成流程?
编辑 1
更具体:如果我有消息传递网关
@Bean
public IntegrationFlow source() {
return IntegrationFlows.from(
() -> new GenericMessage<String>(""),
e -> e.poller(p -> p.fixedRate(period)))
.channel("httpOutRequest")
.get();
}
如何访问第二个 Lambda 中的 p?
如何使用 Control Channel
设置 p.fixedRate
?
我可能已经自己解决了这个问题,阅读手册。
请参阅 here 以在运行时更改轮询速率。为此,您必须使用 org.springframework.integration.util
包中的 DynamicPeriodiyTimer
。
要替换您的延迟轮询器,请执行:
private final DynamicPeriodicTrigger dynamicPeriodicTrigger =
new DynamicPeriodicTrigger(5_000);
@Bean
public IntegrationFlow normalStateEntryPoint() {
return IntegrationFlows.from(
() -> new GenericMessage<String>(""),
e -> e.poller(p -> p.trigger(dynamicPeriodicTrigger))
.id("normalStateSourcePollingChannelAdapter")
.autoStartup(true))
.channel("httpOutRequest")
.get();
}
要将轮询时间从 5.000 毫秒减少到 60.000 毫秒,请执行以下操作:
dynamicPeriodicTrigger.setPeriod(60_000);
就是这样。
我的场景是一个 Http 出站网关,我在其中请求外部服务进行由 TransferRequest
实体表示的下一个转换。网关是“httpOutRequest”通道的端点。 “httpOutRequest”通道的起点是一个 bean IntegrationFlow source()
,我在其中发送由轮询器触发的空字符串消息。 (顺便说一句:这有必要吗?我可以将轮询器直接添加到出站网关吗?怎么做?)
然后我安装了 errorHandler 通道端点来捕获任何问题。如果问题(异常)的数量是 MAX_COUNT_TO_REDUCE_POLLING
- 假设是因为无法访问外部服务 - 那么我想在运行时将轮询从最初的 5_000 减少到 60_000。
到目前为止,这是我的代码:
public static final int MAX_COUNT_TO_REDUCE_POLLING = 3;
private long period = 5000;
private int problemCounter = 0;
@Bean
public IntegrationFlow outbound() {
return IntegrationFlows.from("httpOutRequest")
.handle(Http.outboundGateway("http://localhost:8080/harry-potter-service/next/request")
.httpMethod(HttpMethod.GET)
.expectedResponseType(TransferRequest.class)
)
.channel("reply")
.get();
}
@Bean
public IntegrationFlow source() {
return IntegrationFlows.from(
() -> new GenericMessage<String>(""),
e -> e.poller(p -> p.fixedRate(period)))
.channel("httpOutRequest")
.get();
}
@Bean
@ServiceActivator(inputChannel = "reply")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("myHandler: " + message.getPayload());
System.out.println("myHandler: " + message.getHeaders());
TransferRequest req = (TransferRequest) message.getPayload();
System.out.println("myHandler: " + req);
}
};
}
@Bean
@ServiceActivator(inputChannel = "errorChannel")
public MessageHandler errorHandler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
LOG.error("message.payload: " + message.getPayload());
MessageHandlingException e = (MessageHandlingException) message.getPayload();
LOG.error("Exception: " + e);
LOG.debug("exception counter = " + (++problemCounter));
if (problemCounter >= MAX_COUNT_TO_REDUCE_POLLING) {
LOG.debug("would like to reduce poller frequence or stop");
period = 60_000;
// outbound().stop()
}
}
};
}
当遇到异常阈值时,如何在运行时降低轮询频率?
我怎么能停止集成流程?
编辑 1
更具体:如果我有消息传递网关
@Bean
public IntegrationFlow source() {
return IntegrationFlows.from(
() -> new GenericMessage<String>(""),
e -> e.poller(p -> p.fixedRate(period)))
.channel("httpOutRequest")
.get();
}
如何访问第二个 Lambda 中的 p?
如何使用 Control Channel
设置 p.fixedRate
?
我可能已经自己解决了这个问题,阅读手册。
请参阅 here 以在运行时更改轮询速率。为此,您必须使用 org.springframework.integration.util
包中的 DynamicPeriodiyTimer
。
要替换您的延迟轮询器,请执行:
private final DynamicPeriodicTrigger dynamicPeriodicTrigger =
new DynamicPeriodicTrigger(5_000);
@Bean
public IntegrationFlow normalStateEntryPoint() {
return IntegrationFlows.from(
() -> new GenericMessage<String>(""),
e -> e.poller(p -> p.trigger(dynamicPeriodicTrigger))
.id("normalStateSourcePollingChannelAdapter")
.autoStartup(true))
.channel("httpOutRequest")
.get();
}
要将轮询时间从 5.000 毫秒减少到 60.000 毫秒,请执行以下操作:
dynamicPeriodicTrigger.setPeriod(60_000);
就是这样。