Spring 集成 - 轮询器触发过于频繁
Spring Integration - Poller fires too often
我有以下代码:
@Bean
public IntegrationFlow aggregatingFlow(
AmqpInboundChannelAdapter aggregatorInboundAdapter,
PollableChannel aggregatingChannel,
AmqpOutboundEndpointEnhanced amqpOutboundEndpoint,
PollSkipAdvice pollSkipAdvice) {
return IntegrationFlows.from(aggregatorInboundAdapter)
.wireTap(wtChannel())
.channel(aggregatingChannel)
.handle(
amqpOutboundEndpoint,
e ->
e.poller(
Pollers.fixedDelay(1, TimeUnit.SECONDS, 1)
.maxMessagesPerPoll(DEFAULT_MESSAGES_PER_POLL)
.advice(pollSkipAdvice))
.id("pollingConsumer"))
.get();
}
这里 pollSkipAdvice
定义为
@Bean
public PollSkipAdvice pollSkipAdvice() {
return new PollSkipAdvice(
new PollSkipStrategy() {
int currentPoll = 1;
@Override
public synchronized boolean skipPoll() {
int hitRate = 0; //just not to add to the code; its dynamic
if (currentPoll >= hitRate) {
System.out.println(MessageFormat.format("{0} : Hit poll number {1} for message number of {2} | {3}",
DateTimeFormatter.ofPattern("HH:mm:ss. SSS")
.withZone(ZoneOffset.UTC)
.format(Instant.now()),
currentPoll,
currentMessagesInQueue, Thread.currentThread().getName()));
currentPoll = 1;
return false;
}
currentPoll++;
return true;
}
});
}
我面临的问题是每秒调用 Poller 不止一次。这是我在测试代码时得到的日志:
21:41:47. 652 : Hit poll number 1 for message number of 0 | pool-3-thread-1
21:41:47. 654 : Hit poll number 1 for message number of 0 | pool-3-thread-1
EventChange(id=190)
21:41:47. 656 : Hit poll number 1 for message number of 0 | pool-3-thread-1
EventChange(id=191)
21:41:47. 658 : Hit poll number 1 for message number of 0 | pool-3-thread-1
21:41:47. 660 : Hit poll number 1 for message number of 0 | pool-3-thread-1
EventChange(id=192)
21:41:47. 662 : Hit poll number 1 for message number of 0 | pool-3-thread-1
EventChange(id=193)
21:41:47. 665 : Hit poll number 1 for message number of 0 | pool-3-thread-1
EventChange(id=194)
21:41:47. 667 : Hit poll number 1 for message number of 0 | pool-3-thread-1
EventChange(id=195)
21:41:47. 669 : Hit poll number 1 for message number of 0 | pool-3-thread-1
21:41:47. 671 : Hit poll number 1 for message number of 0 | pool-3-thread-1
EventChange(id=196)
EventChange(id=197)
EventChange(id=198)
EventChange(id=199)
21:41:48. 653 : Hit poll number 1 for message number of 0 | pool-3-thread-1
21:41:49. 656 : Hit poll number 1 for message number of 0 | pool-3-thread-1
21:41:50. 659 : Hit poll number 1 for message number of 0 | pool-3-thread-1
21:41:51. 661 : Hit poll number 1 for message number of 0 | pool-3-thread-1
21:41:52. 662 : Hit poll number 1 for message number of 0 | pool-3-thread-1
似乎每当我从 AMQP 收到一些东西时,这个 Poller 就会触发很多次。您可以在日志中看到 21:41:47. 652
到 21:41:47. 671
在 1 秒内触发了 10 次。另一方面,一旦结束 - 它就会正常工作(每秒一次)。
当我尝试为这 2 次打印堆栈跟踪时,距离只有几毫秒,我得到了这个:
20:32:56. 406 : Hit poll number 2 for message number of 0 | pool-3-thread-1
java.lang.Thread.getStackTrace(Thread.java:1559)
com.a.configuration.component.EventAggregatorConfig.skipPoll(EventAggregatorConfig.java:313)
org.springframework.integration.scheduling.PollSkipAdvice.invoke(PollSkipAdvice.java:51)
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
com.sun.proxy.$Proxy136.call(Unknown Source)
org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:328)
org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null(AbstractPollingEndpoint.java:275)
org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute[=14=](ErrorHandlingTaskExecutor.java:57)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)
20:32:56. 408 : Hit poll number 1 for message number of 0 | pool-3-thread-1
java.lang.Thread.getStackTrace(Thread.java:1559)
com.a.configuration.component.EventAggregatorConfig.skipPoll(EventAggregatorConfig.java:313)
org.springframework.integration.scheduling.PollSkipAdvice.invoke(PollSkipAdvice.java:51)
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
com.sun.proxy.$Proxy136.call(Unknown Source)
org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:328)
org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null(AbstractPollingEndpoint.java:275)
org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute[=14=](ErrorHandlingTaskExecutor.java:57)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)
我是不是配置错了?我试图定义单独的 taskScheduler,但它似乎根本没有使用那个。
更新
可能经过一些分析,这可能是由于 spring AbstractPollingEndpoint
class:
中的这一部分
while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
if (pollForMessage() == null) {
break;
}
count++;
}
所以基本上即使我说每 1 秒轮询一次,如果有 10 条消息;但如果没有消息则什么都不做 - 如果没有发现它会继续尝试轮询?
更新2
是的,似乎每个消息轮询都会调用 skipPoll
建议,而不是 maxMessagesPerPoll
。
我不确定问题是什么。逻辑绝对是我们对这个建议的期望:
private Runnable createPoller() {
return () ->
this.taskExecutor.execute(() -> {
int count = 0;
while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
if (this.maxMessagesPerPoll == 0) {
logger.info("Polling disabled while 'maxMessagesPerPoll == 0'");
break;
}
if (pollForMessage() == null) {
break;
}
count++;
}
});
}
因此,如果 pollForMessage()
returns null
我们只是从 while()
循环中中断并退出当前轮询周期。 null
发生在 PollSkipAdvice
时 PollSkipStrategy
returns false
.
您可能只是误解了轮询周期并真正进行了轮询。第一个确实是触发器命中时间及其maxMessagesPerPoll
的时刻。真正的 poll
确实是关于单个 pollForMessage
调用。您可能需要修改自定义策略的逻辑以满足应用此类建议的现有要求。
我有以下代码:
@Bean
public IntegrationFlow aggregatingFlow(
AmqpInboundChannelAdapter aggregatorInboundAdapter,
PollableChannel aggregatingChannel,
AmqpOutboundEndpointEnhanced amqpOutboundEndpoint,
PollSkipAdvice pollSkipAdvice) {
return IntegrationFlows.from(aggregatorInboundAdapter)
.wireTap(wtChannel())
.channel(aggregatingChannel)
.handle(
amqpOutboundEndpoint,
e ->
e.poller(
Pollers.fixedDelay(1, TimeUnit.SECONDS, 1)
.maxMessagesPerPoll(DEFAULT_MESSAGES_PER_POLL)
.advice(pollSkipAdvice))
.id("pollingConsumer"))
.get();
}
这里 pollSkipAdvice
定义为
@Bean
public PollSkipAdvice pollSkipAdvice() {
return new PollSkipAdvice(
new PollSkipStrategy() {
int currentPoll = 1;
@Override
public synchronized boolean skipPoll() {
int hitRate = 0; //just not to add to the code; its dynamic
if (currentPoll >= hitRate) {
System.out.println(MessageFormat.format("{0} : Hit poll number {1} for message number of {2} | {3}",
DateTimeFormatter.ofPattern("HH:mm:ss. SSS")
.withZone(ZoneOffset.UTC)
.format(Instant.now()),
currentPoll,
currentMessagesInQueue, Thread.currentThread().getName()));
currentPoll = 1;
return false;
}
currentPoll++;
return true;
}
});
}
我面临的问题是每秒调用 Poller 不止一次。这是我在测试代码时得到的日志:
21:41:47. 652 : Hit poll number 1 for message number of 0 | pool-3-thread-1
21:41:47. 654 : Hit poll number 1 for message number of 0 | pool-3-thread-1
EventChange(id=190)
21:41:47. 656 : Hit poll number 1 for message number of 0 | pool-3-thread-1
EventChange(id=191)
21:41:47. 658 : Hit poll number 1 for message number of 0 | pool-3-thread-1
21:41:47. 660 : Hit poll number 1 for message number of 0 | pool-3-thread-1
EventChange(id=192)
21:41:47. 662 : Hit poll number 1 for message number of 0 | pool-3-thread-1
EventChange(id=193)
21:41:47. 665 : Hit poll number 1 for message number of 0 | pool-3-thread-1
EventChange(id=194)
21:41:47. 667 : Hit poll number 1 for message number of 0 | pool-3-thread-1
EventChange(id=195)
21:41:47. 669 : Hit poll number 1 for message number of 0 | pool-3-thread-1
21:41:47. 671 : Hit poll number 1 for message number of 0 | pool-3-thread-1
EventChange(id=196)
EventChange(id=197)
EventChange(id=198)
EventChange(id=199)
21:41:48. 653 : Hit poll number 1 for message number of 0 | pool-3-thread-1
21:41:49. 656 : Hit poll number 1 for message number of 0 | pool-3-thread-1
21:41:50. 659 : Hit poll number 1 for message number of 0 | pool-3-thread-1
21:41:51. 661 : Hit poll number 1 for message number of 0 | pool-3-thread-1
21:41:52. 662 : Hit poll number 1 for message number of 0 | pool-3-thread-1
似乎每当我从 AMQP 收到一些东西时,这个 Poller 就会触发很多次。您可以在日志中看到 21:41:47. 652
到 21:41:47. 671
在 1 秒内触发了 10 次。另一方面,一旦结束 - 它就会正常工作(每秒一次)。
当我尝试为这 2 次打印堆栈跟踪时,距离只有几毫秒,我得到了这个:
20:32:56. 406 : Hit poll number 2 for message number of 0 | pool-3-thread-1
java.lang.Thread.getStackTrace(Thread.java:1559)
com.a.configuration.component.EventAggregatorConfig.skipPoll(EventAggregatorConfig.java:313)
org.springframework.integration.scheduling.PollSkipAdvice.invoke(PollSkipAdvice.java:51)
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
com.sun.proxy.$Proxy136.call(Unknown Source)
org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:328)
org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null(AbstractPollingEndpoint.java:275)
org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute[=14=](ErrorHandlingTaskExecutor.java:57)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)
20:32:56. 408 : Hit poll number 1 for message number of 0 | pool-3-thread-1
java.lang.Thread.getStackTrace(Thread.java:1559)
com.a.configuration.component.EventAggregatorConfig.skipPoll(EventAggregatorConfig.java:313)
org.springframework.integration.scheduling.PollSkipAdvice.invoke(PollSkipAdvice.java:51)
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
com.sun.proxy.$Proxy136.call(Unknown Source)
org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:328)
org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null(AbstractPollingEndpoint.java:275)
org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute[=14=](ErrorHandlingTaskExecutor.java:57)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)
我是不是配置错了?我试图定义单独的 taskScheduler,但它似乎根本没有使用那个。
更新
可能经过一些分析,这可能是由于 spring AbstractPollingEndpoint
class:
while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
if (pollForMessage() == null) {
break;
}
count++;
}
所以基本上即使我说每 1 秒轮询一次,如果有 10 条消息;但如果没有消息则什么都不做 - 如果没有发现它会继续尝试轮询?
更新2
是的,似乎每个消息轮询都会调用 skipPoll
建议,而不是 maxMessagesPerPoll
。
我不确定问题是什么。逻辑绝对是我们对这个建议的期望:
private Runnable createPoller() {
return () ->
this.taskExecutor.execute(() -> {
int count = 0;
while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
if (this.maxMessagesPerPoll == 0) {
logger.info("Polling disabled while 'maxMessagesPerPoll == 0'");
break;
}
if (pollForMessage() == null) {
break;
}
count++;
}
});
}
因此,如果 pollForMessage()
returns null
我们只是从 while()
循环中中断并退出当前轮询周期。 null
发生在 PollSkipAdvice
时 PollSkipStrategy
returns false
.
您可能只是误解了轮询周期并真正进行了轮询。第一个确实是触发器命中时间及其maxMessagesPerPoll
的时刻。真正的 poll
确实是关于单个 pollForMessage
调用。您可能需要修改自定义策略的逻辑以满足应用此类建议的现有要求。