消息组过期 2 次
Message Group expiring 2 times
我有一个 spring 流,我在其中聚合消息。我正在使用超时来使组过期。
<int:aggregator input-channel="inputChannel" expire-groups-upon-completion="true" expire-groups-upon-timeout="true" discard-channel="timeoutChannel" group-timeout="10000" correlation-strategy-expression="headers['id']" output-channel="release"/>
在 timeoutChannel 任何组超时时我都会发送电子邮件。设置电子邮件时失败(无法连接到正常的服务器)并发送到我记录错误的错误通道。
我无法弄清楚为什么它的过期消息使用相同的日志消息两次?
错误日志
2021-05-27 17:53:46,213 DEBUG [task-scheduler-1] org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler:528 - Cancel 'forceComplete' scheduling for MessageGroup [ SimpleMessageGroup{groupId=d02b06cb-1c59-2d87-a3d6-080de89799e4,
2021-05-27 17:53:46,215 INFO [task-scheduler-1] org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler:632 - Expiring MessageGroup with correlationKey[d02b06cb-1c59-2d87-a3d6-080de89799e4]
2021-05-27 17:53:46,215 DEBUG [task-scheduler-1] org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler:643 - Discarding messages of partially complete group with key [d02b06cb-1c59-2d87-a3d6-080de89799e4] to: aggregatorTimeoutChannel
2021-05-27 17:53:46,215 DEBUG [task-scheduler-1] org.springframework.integration.channel.DirectChannel:413 - preSend on channel 'aggregatorTimeoutChannel',
2021-05-27 17:53:46,216 DEBUG [task-scheduler-1] org.springframework.integration.channel.DirectChannel:413 - preSend on channel 'aggregatorTimeoutAlert',
2021-05-27 17:53:46,218 DEBUG [task-scheduler-1] org.springframework.integration.filter.MessageFilter:115 - timeOutAlertMailChain$child#0.handler received message: GenericMessage [
2021-05-27 17:53:46,219 DEBUG [task-scheduler-1] org.springframework.integration.handler.ServiceActivatingHandler:115 - ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@18afd6e2] (timeOutAlertMailChain$child#1) received message:
2021-05-27 17:53:46,220 DEBUG [task-scheduler-1] org.springframework.integration.transformer.MessageTransformingHandler:115 - timeOutAlertMailChain$child#2.handler received message:
2021-05-27 17:53:46,221 DEBUG [task-scheduler-1] org.springframework.integration.transformer.MessageTransformingHandler:115 - timeOutAlertMailChain$child#3.handler received message:
2021-05-27 17:53:46,222 DEBUG [task-scheduler-1] org.springframework.integration.transformer.MessageTransformingHandler:115 - timeOutAlertMailChain$child#4.handler received message:
2021-05-27 17:53:46,222 DEBUG [task-scheduler-1] org.springframework.integration.transformer.MessageTransformingHandler:115 - timeOutAlertMailChain$child#5.handler received message:
2021-05-27 17:53:46,225 DEBUG [task-scheduler-1] org.springframework.integration.mail.MailSendingMessageHandler:115 - timeOutAlertMailChain$child#6.handler received message: GenericM
2021-05-27 17:53:48,238 DEBUG [task-scheduler-1] org.springframework.integration.channel.DirectChannel:413 - preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [timeOutAlertMailChain$child#6.handler]; nested exception is org.springframework.mail.MailSendException: Mail server connection failed; nested exception is com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
java.net.ConnectException: Connection refused: connect. Failed messages: com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
2021-05-27 17:53:48,240 DEBUG [task-scheduler-1] org.springframework.integration.handler.MethodInvokingMessageHandler:115 - org.springframework.integration.handler.MethodInvokingMessageHandler#0 received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [timeOutAlertMailChain$child#6.handler]; nested exception is org.springframework.mail.MailSendException: Mail server connection failed; nested exception is com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
java.net.ConnectException: Connection refused: connect. Failed messages: com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
java.net.ConnectException: Connection refused: connect; message exceptions (1) are:
Failed message 1: com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
java.net.ConnectException: Connection refused: connect, failedMessage=GenericMessage [payload=<?xml version="1.0" encoding="UTF-8"?><html xmlns="http://www.w3.org/TR/REC-html40" xmlns:o="urn:schemas-microsoft-com:office:office"><head><meta http-equiv="Content-Type" content="text/html; charset=us-ascii"/><meta name="Generator" content="Microsoft Word 14 (filtered medium)"/></head><body lang="EN-GB" link="blue" vlink="purple"><div class="WordSection1"><p class="MsoNormal"><span lang="EN-US" style="mso-fareast-language: EN-IE">
2021-05-27 17:53:48,240 ERROR [task-scheduler-1] com.examplet.ErrorHandler:26 - ERROR!!!
error occurred in message handler [timeOutAlertMailChain$child#6.handler]; nested exception is org.springframework.mail.MailSendException: Mail server connection failed; nested exception is com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
java.net.ConnectException: Connection refused: connect. Failed messages: com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
java.net.ConnectException: Connection refused: connect; message exceptions (1) are:
Failed message 1: com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
java.net.ConnectException: Connection refused: connect
2021-05-27 17:53:48,241 DEBUG [task-scheduler-1] org.springframework.integration.channel.DirectChannel:432 - postSend (sent=true) on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [timeOutAlertMailChain$child#6.handler]; nested exception is org.springframework.mail.MailSendException: Mail server connection failed; nested exception is com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
java.net.ConnectException: Connection refused: connect. Failed messages: com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
java.net.ConnectException: Connection refused: connect; message exceptions (1) are:
Failed message 1: com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
您可能需要考虑在到期超时发生时发出常规输出。如您所见,send-partial-result-on-expiry="true"
解决了这个问题。
您可能需要考虑的不是 expire-groups-upon-timeout="true"
,而是 false
。这样,过期的组就不会从存储中删除,并且所有迟到的消息都将被丢弃,因为组被标记为 complete
.
我认为您没有必要研究 <int:expire-advice-chain>
,因为根据我们的 send-partial-result-on-expiry="true"
.
查看有关聚合器及其功能的更多文档:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator
更新
如果您想使用 <int:expire-advice-chain>
,您需要像这样实现 MethodInterceptor
:
public class ExpiredGroupNotificationMethodInterceptor implements MethodInterceptor {
@Nullable
@Override
public Object invoke(@NotNull MethodInvocation invocation) throws Throwable {
MessageGroup expiredGroup = (MessageGroup) invocation.getArguments()[0];
Collection<Message<?>> partiallyReleasedMessages = expiredGroup.getMessages();
// Do some notification here
return invocation.proceed();
}
}
并将其添加为该链的 <bean>
:
<expire-advice-chain>
<beans:bean class="ExpiredGroupNotificationMethodInterceptor"/>
</expire-advice-chain>
作为 <aggregator>
标签的嵌套配置。
我有一个 spring 流,我在其中聚合消息。我正在使用超时来使组过期。
<int:aggregator input-channel="inputChannel" expire-groups-upon-completion="true" expire-groups-upon-timeout="true" discard-channel="timeoutChannel" group-timeout="10000" correlation-strategy-expression="headers['id']" output-channel="release"/>
在 timeoutChannel 任何组超时时我都会发送电子邮件。设置电子邮件时失败(无法连接到正常的服务器)并发送到我记录错误的错误通道。
我无法弄清楚为什么它的过期消息使用相同的日志消息两次?
错误日志
2021-05-27 17:53:46,213 DEBUG [task-scheduler-1] org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler:528 - Cancel 'forceComplete' scheduling for MessageGroup [ SimpleMessageGroup{groupId=d02b06cb-1c59-2d87-a3d6-080de89799e4,
2021-05-27 17:53:46,215 INFO [task-scheduler-1] org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler:632 - Expiring MessageGroup with correlationKey[d02b06cb-1c59-2d87-a3d6-080de89799e4]
2021-05-27 17:53:46,215 DEBUG [task-scheduler-1] org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler:643 - Discarding messages of partially complete group with key [d02b06cb-1c59-2d87-a3d6-080de89799e4] to: aggregatorTimeoutChannel
2021-05-27 17:53:46,215 DEBUG [task-scheduler-1] org.springframework.integration.channel.DirectChannel:413 - preSend on channel 'aggregatorTimeoutChannel',
2021-05-27 17:53:46,216 DEBUG [task-scheduler-1] org.springframework.integration.channel.DirectChannel:413 - preSend on channel 'aggregatorTimeoutAlert',
2021-05-27 17:53:46,218 DEBUG [task-scheduler-1] org.springframework.integration.filter.MessageFilter:115 - timeOutAlertMailChain$child#0.handler received message: GenericMessage [
2021-05-27 17:53:46,219 DEBUG [task-scheduler-1] org.springframework.integration.handler.ServiceActivatingHandler:115 - ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@18afd6e2] (timeOutAlertMailChain$child#1) received message:
2021-05-27 17:53:46,220 DEBUG [task-scheduler-1] org.springframework.integration.transformer.MessageTransformingHandler:115 - timeOutAlertMailChain$child#2.handler received message:
2021-05-27 17:53:46,221 DEBUG [task-scheduler-1] org.springframework.integration.transformer.MessageTransformingHandler:115 - timeOutAlertMailChain$child#3.handler received message:
2021-05-27 17:53:46,222 DEBUG [task-scheduler-1] org.springframework.integration.transformer.MessageTransformingHandler:115 - timeOutAlertMailChain$child#4.handler received message:
2021-05-27 17:53:46,222 DEBUG [task-scheduler-1] org.springframework.integration.transformer.MessageTransformingHandler:115 - timeOutAlertMailChain$child#5.handler received message:
2021-05-27 17:53:46,225 DEBUG [task-scheduler-1] org.springframework.integration.mail.MailSendingMessageHandler:115 - timeOutAlertMailChain$child#6.handler received message: GenericM
2021-05-27 17:53:48,238 DEBUG [task-scheduler-1] org.springframework.integration.channel.DirectChannel:413 - preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [timeOutAlertMailChain$child#6.handler]; nested exception is org.springframework.mail.MailSendException: Mail server connection failed; nested exception is com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
java.net.ConnectException: Connection refused: connect. Failed messages: com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
2021-05-27 17:53:48,240 DEBUG [task-scheduler-1] org.springframework.integration.handler.MethodInvokingMessageHandler:115 - org.springframework.integration.handler.MethodInvokingMessageHandler#0 received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [timeOutAlertMailChain$child#6.handler]; nested exception is org.springframework.mail.MailSendException: Mail server connection failed; nested exception is com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
java.net.ConnectException: Connection refused: connect. Failed messages: com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
java.net.ConnectException: Connection refused: connect; message exceptions (1) are:
Failed message 1: com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
java.net.ConnectException: Connection refused: connect, failedMessage=GenericMessage [payload=<?xml version="1.0" encoding="UTF-8"?><html xmlns="http://www.w3.org/TR/REC-html40" xmlns:o="urn:schemas-microsoft-com:office:office"><head><meta http-equiv="Content-Type" content="text/html; charset=us-ascii"/><meta name="Generator" content="Microsoft Word 14 (filtered medium)"/></head><body lang="EN-GB" link="blue" vlink="purple"><div class="WordSection1"><p class="MsoNormal"><span lang="EN-US" style="mso-fareast-language: EN-IE">
2021-05-27 17:53:48,240 ERROR [task-scheduler-1] com.examplet.ErrorHandler:26 - ERROR!!!
error occurred in message handler [timeOutAlertMailChain$child#6.handler]; nested exception is org.springframework.mail.MailSendException: Mail server connection failed; nested exception is com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
java.net.ConnectException: Connection refused: connect. Failed messages: com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
java.net.ConnectException: Connection refused: connect; message exceptions (1) are:
Failed message 1: com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
java.net.ConnectException: Connection refused: connect
2021-05-27 17:53:48,241 DEBUG [task-scheduler-1] org.springframework.integration.channel.DirectChannel:432 - postSend (sent=true) on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [timeOutAlertMailChain$child#6.handler]; nested exception is org.springframework.mail.MailSendException: Mail server connection failed; nested exception is com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
java.net.ConnectException: Connection refused: connect. Failed messages: com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
java.net.ConnectException: Connection refused: connect; message exceptions (1) are:
Failed message 1: com.sun.mail.util.MailConnectException: Couldn't connect to host, port: localhost, 25; timeout -1;
nested exception is:
您可能需要考虑在到期超时发生时发出常规输出。如您所见,send-partial-result-on-expiry="true"
解决了这个问题。
您可能需要考虑的不是 expire-groups-upon-timeout="true"
,而是 false
。这样,过期的组就不会从存储中删除,并且所有迟到的消息都将被丢弃,因为组被标记为 complete
.
我认为您没有必要研究 <int:expire-advice-chain>
,因为根据我们的 send-partial-result-on-expiry="true"
.
查看有关聚合器及其功能的更多文档:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator
更新
如果您想使用 <int:expire-advice-chain>
,您需要像这样实现 MethodInterceptor
:
public class ExpiredGroupNotificationMethodInterceptor implements MethodInterceptor {
@Nullable
@Override
public Object invoke(@NotNull MethodInvocation invocation) throws Throwable {
MessageGroup expiredGroup = (MessageGroup) invocation.getArguments()[0];
Collection<Message<?>> partiallyReleasedMessages = expiredGroup.getMessages();
// Do some notification here
return invocation.proceed();
}
}
并将其添加为该链的 <bean>
:
<expire-advice-chain>
<beans:bean class="ExpiredGroupNotificationMethodInterceptor"/>
</expire-advice-chain>
作为 <aggregator>
标签的嵌套配置。