使用 spring 集成设计电子邮件每日摘要功能

Design a email daily digest feature with spring integration

我目前有一个在 JMS 队列上侦听的 SI 项目进行一些处理并根据消息发送电子邮件、写入文件...

现在我想在发送电子邮件部分添加每日摘要功能。我希望将电子邮件存储在某个地方,并且每天一次将具有相同目标电子邮件的所有邮件分组,将内容串联起来并发送电子邮件。

最好的设计解决方案是什么? 我检查了聚合器概念,但我有几个问题:

感谢您的帮助。

干杯

编辑

我现在在这里尝试:

<int:aggregator id="templatingDailyAggregator"
    input-channel="templatingDailyAggregatorInputChannel" 
    output-channel="templatingDailyAggregatorOutputChannel"

    message-store="templatingEmailAggregatorStore"
    correlation-strategy-expression="headers['templatingEmailGroupingCategory']+payload.emailMessage.email"
    release-strategy-expression="false"

    send-partial-result-on-expiry="true"
    expire-groups-upon-completion="true"
    >

    <int:expire-transactional transaction-manager="templatingAggregatorStoreTransactionManager"/>
</int:aggregator>

<int-jdbc:message-store id="templatingEmailAggregatorStore" data-source="templatingEmailAggregatorStoreDataSource" />

<bean id="templatingAggregatorStoreTransactionManager"
    class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    <property name="dataSource" ref="templatingEmailAggregatorStoreDataSource" />
</bean>

<!-- MySQL DB DataSource -->
<bean id="templatingEmailAggregatorStoreDataSource"
    class="org.springframework.jdbc.datasource.DriverManagerDataSource">

    <property name="driverClassName" value="com.mysql.jdbc.Driver" />
    <property name="url" value="MYURL" />
    <property name="username" value="MYUSER" />
    <property name="password" value="MYPASS" />
</bean>

<bean id="templatingEmailAggregatorStoreReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="templatingEmailAggregatorStore"/>
    <property name="timeout" value="10"/>
</bean>

<task:scheduled-tasks scheduler="templatingAggregatorScheduler">
    <task:scheduled ref="templatingEmailAggregatorStoreReaper" method="run" cron="0 */2 * * * * "/>
</task:scheduled-tasks>

<task:scheduler id="templatingAggregatorScheduler"/>

<int:transformer id="templatingDailyDigestTransformer" ref="templatingDailyDigestTransformerBean" input-channel="templatingDailyAggregatorOutputChannel" method="processMessage" output-channel="emailOutputChannel"/>

但是有一个问题,如果我在聚合部分之后出现异常(例如在发送电子邮件期间),我希望 MessageGroup 消费回滚,因此仍然存在于数据库中。然而,这不是 MessageGroup 被消耗的情况,它不再存在于数据库中,因此丢失了。

编辑 2

我在上面的聚合器之后添加了转换器XML。目前这个转换器只是引发一个异常来测试崩溃的情况。 这是我选择的堆栈跟踪:

DEBUG [exec-1] - Expiring all messages older than timeout=10 from message group store: org.springframework.integration.jdbc.JdbcMessageStore@3e082583
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT distinct GROUP_KEY as CREATED from INT_MESSAGE_GROUP where REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT MESSAGE_ID, MESSAGE_BYTES, CREATED_DATE from INT_MESSAGE where MESSAGE_ID in (SELECT MESSAGE_ID from INT_GROUP_TO_MESSAGE where GROUP_KEY = ?) and REGION=? ORDER BY CREATED_DATE]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning BLOB as bytes
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT COMPLETE, LAST_RELEASED_SEQUENCE, CREATED_DATE, UPDATED_DATE from INT_MESSAGE_GROUP where GROUP_KEY = ? and REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT MESSAGE_ID, MESSAGE_BYTES, CREATED_DATE from INT_MESSAGE where MESSAGE_ID in (SELECT MESSAGE_ID from INT_GROUP_TO_MESSAGE where GROUP_KEY = ?) and REGION=? ORDER BY CREATED_DATE]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning BLOB as bytes
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT COMPLETE, LAST_RELEASED_SEQUENCE, CREATED_DATE, UPDATED_DATE from INT_MESSAGE_GROUP where GROUP_KEY = ? and REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning JDBC Connection to DataSource
INFO  [exec-1] - Expiring MessageGroup with correlationKey[fb90fe78-c3df-3793-9ee8-acae4924bebe]
DEBUG [exec-1] - Prematurely releasing partially complete group with key [fb90fe78-c3df-3793-9ee8-acae4924bebe] to: templatingDailyAggregatorOutputChannel
DEBUG [exec-1] - Completing group with correlationKey [fb90fe78-c3df-3793-9ee8-acae4924bebe]
DEBUG [exec-1] - org.springframework.integration.transformer.MessageTransformingHandler@2fdde28c received message: GenericMessage [payload=[ch.post.ehealth.extcom.templating.core.PreAggregatorEmailMessage@563b5cce], headers={jms_timestamp=1426090799820, extcomPluginDestination=templatingPluginInputChannel, extcomId=5782d059-c88d-44ae-82a1-0b738b43e821, jms_messageId=ID:some-vm-45363-1421229178337-3:9:1:1:4, timestamp=1426090920046, id=b8330bed-fc74-c8d5-6838-a3116a05ab39, history=jmsInputAdapter,inputChannel,xmlToSpringIntTransformer,pluginRouterChannel,pluginRouter,templatingPluginInputChannel,templatingTransformer,templatingPluginOutputChannel,templatingOutputRouter,templatingEmailOutputChannel,templatingEmailGroupingRouter,templatingPreAggregatorChannel,templatingPreAggregatorTransformer,templatingDailyAggregatorInputChannel,templatingDailyAggregator,templatingDailyAggregatorOutputChannel, JdbcMessageStore.CREATED_DATE=1426090800772, jms_type=, jms_redelivered=false, priority=0, templatingEmailGrouping=DAILY, jms_correlationId=, JdbcMessageStore.SAVED=true, templatingEmailGroupingCategory=DAILY}]
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [select MESSAGE_ID, CREATED_DATE from INT_MESSAGE where MESSAGE_ID in (select MESSAGE_ID from INT_GROUP_TO_MESSAGE where GROUP_KEY=? and REGION=?) ORDER BY CREATED_DATE]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT MESSAGE_ID, CREATED_DATE, MESSAGE_BYTES from INT_MESSAGE where MESSAGE_ID=? and REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning BLOB as bytes
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL update
DEBUG [exec-1] - Executing prepared SQL statement [DELETE from INT_MESSAGE where MESSAGE_ID=? and REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - SQL update affected 1 rows
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL update
DEBUG [exec-1] - Executing prepared SQL statement [DELETE from INT_GROUP_TO_MESSAGE where GROUP_KEY=? and REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Removing relationships for the group with group key=fb90fe78-c3df-3793-9ee8-acae4924bebe
DEBUG [exec-1] - SQL update affected 1 rows
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL update
DEBUG [exec-1] - Executing prepared SQL statement [DELETE from INT_MESSAGE_GROUP where GROUP_KEY=? and REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Marking messages with group key=fb90fe78-c3df-3793-9ee8-acae4924bebe
DEBUG [exec-1] - SQL update affected 1 rows
DEBUG [exec-1] - Returning JDBC Connection to DataSource
ERROR [exec-1] - Exception in expiry callbackjava.lang.RuntimeException: test crash
    at ch.post.ehealth.extcom.templating.core.DailyDigestTransformer.processMessage(DailyDigestTransformer.java:33) ~[extcom-templating.jar:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71]
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
    at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.7.0_71]
    at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:63) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:95) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.expression.spel.ast.MethodReference.access[=12=]0(MethodReference.java:44) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:258) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:84) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:112) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:111) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:164) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:276) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:142) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:75) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
Wrapped by: org.springframework.messaging.MessageHandlingException: ; nested exception is java.lang.RuntimeException: test crash
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:78) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.transformer.AbstractMessageProcessingTransformer.transform(AbstractMessageProcessingTransformer.java:64) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:68) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
Wrapped by: org.springframework.integration.transformer.MessageTransformationException: ; nested exception is org.springframework.messaging.MessageHandlingException: ; nested exception is java.lang.RuntimeException: test crash
    at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:74) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:277) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.1.4.RELEASE.jar:4.1.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.1.4.RELEASE.jar:4.1.4.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95) ~[spring-messaging-4.1.4.RELEASE.jar:4.1.4.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:248) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:171) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:119) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:657) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:642) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.expireGroup(AbstractCorrelatingMessageHandler.java:619) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.forceComplete(AbstractCorrelatingMessageHandler.java:543) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler$ForceReleaseMessageGroupProcessor.processMessageGroup(AbstractCorrelatingMessageHandler.java:721) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.execute(AbstractCorrelatingMessageHandler.java:168) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.store.AbstractMessageGroupStore.expire(AbstractMessageGroupStore.java:169) [spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.store.AbstractMessageGroupStore.expireMessageGroups(AbstractMessageGroupStore.java:113) [spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.store.MessageGroupStoreReaper.run(MessageGroupStoreReaper.java:115) [spring-integration-core-4.1.2.RELEASE.jar:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71]
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
    at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.7.0_71]
    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:65) [spring-context-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) [spring-context-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) [spring-context-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [na:1.7.0_71]
    at java.util.concurrent.FutureTask.run(Unknown Source) [na:1.7.0_71]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(Unknown Source) [na:1.7.0_71]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [na:1.7.0_71]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [na:1.7.0_71]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [na:1.7.0_71]
    at java.lang.Thread.run(Unknown Source) [na:1.7.0_71]

DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT MESSAGE_ID, MESSAGE_BYTES, CREATED_DATE from INT_MESSAGE where MESSAGE_ID in (SELECT MESSAGE_ID from INT_GROUP_TO_MESSAGE where GROUP_KEY = ?) and REGION=? ORDER BY CREATED_DATE]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT COMPLETE, LAST_RELEASED_SEQUENCE, CREATED_DATE, UPDATED_DATE from INT_MESSAGE_GROUP where GROUP_KEY = ? and REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Group expiry candidate (fb90fe78-c3df-3793-9ee8-acae4924bebe) has changed - it may be reconsidered for a future expiration
ERROR [exec-1] - Unexpected error occurred in scheduled task.java.lang.RuntimeException: test crash
    at ch.post.ehealth.extcom.templating.core.DailyDigestTransformer.processMessage(DailyDigestTransformer.java:33) ~[extcom-templating.jar:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71]
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
    at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.7.0_71]
    at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:63) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:95) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.expression.spel.ast.MethodReference.access[=12=]0(MethodReference.java:44) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:258) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:84) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:112) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:111) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:164) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:276) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:142) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:75) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
Wrapped by: org.springframework.messaging.MessageHandlingException: ; nested exception is java.lang.RuntimeException: test crash
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:78) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.transformer.AbstractMessageProcessingTransformer.transform(AbstractMessageProcessingTransformer.java:64) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:68) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
Wrapped by: org.springframework.integration.transformer.MessageTransformationException: ; nested exception is org.springframework.messaging.MessageHandlingException: ; nested exception is java.lang.RuntimeException: test crash
    at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:74) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:277) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.1.4.RELEASE.jar:4.1.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.1.4.RELEASE.jar:4.1.4.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95) ~[spring-messaging-4.1.4.RELEASE.jar:4.1.4.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:248) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:171) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:119) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:657) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:642) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.expireGroup(AbstractCorrelatingMessageHandler.java:619) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.forceComplete(AbstractCorrelatingMessageHandler.java:543) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler$ForceReleaseMessageGroupProcessor.processMessageGroup(AbstractCorrelatingMessageHandler.java:721) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.execute(AbstractCorrelatingMessageHandler.java:168) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.store.AbstractMessageGroupStore.expire(AbstractMessageGroupStore.java:169) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.store.AbstractMessageGroupStore.expireMessageGroups(AbstractMessageGroupStore.java:113) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.store.MessageGroupStoreReaper.run(MessageGroupStoreReaper.java:115) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71]
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
    at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.7.0_71]
    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:65) ~[spring-context-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) [spring-context-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [na:1.7.0_71]
    at java.util.concurrent.FutureTask.run(Unknown Source) [na:1.7.0_71]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(Unknown Source) [na:1.7.0_71]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [na:1.7.0_71]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [na:1.7.0_71]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [na:1.7.0_71]
    at java.lang.Thread.run(Unknown Source) [na:1.7.0_71]

DEBUG [exec-1] - Expiring all messages older than timeout=10 from message group store: org.springframework.integration.jdbc.JdbcMessageStore@3e082583
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT distinct GROUP_KEY as CREATED from INT_MESSAGE_GROUP where REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning JDBC Connection to DataSource

嗯,看来你走对了。

您可以将聚合器附在 TX for JMS 和 JDBC 中,并按所需 correlationKey 对消息进行分组,例如correlation-key-expression.

由于您不想 release 分组直到某些日常事件(例如 cron trigget),您应该使用以下选项标记您的聚合器:

release-strategy-expression="false"
send-partial-result-on-expiry="true"
expire-groups-upon-completion="true"
  • 第一个:不允许正常release
  • 第二个:将聚合器结果发送到 output-channel,而不是 expire
  • 上的 discard-channel
  • 第三个:从 MessageStore 中删除组以允许为相同的 correlationKey
  • 形成新的组

要使其正常工作,您应该配置 MessageGroupStoreReaper:

<bean id="reaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="messageStore"/>
    <property name="timeout" value="10"/>
</bean>

<task:scheduled-tasks scheduler="scheduler">
    <task:scheduled ref="reaper" method="run" cron="0 0 * * * * "/>
</task:scheduled-tasks>

<task:scheduler id="scheduler"/>

Reaper 调用在聚合器初始化期间注册的 MessageGroupCallback。该回调调用聚合器上的 forceComplete 以允许使用所有提到的选项。

从 SI-4.1 开始 <aggregator> 支持 <expire-transactional> 子元素,它允许将 forceComplete 包装到 TX,就像你问的那样。

之前我们需要将 MessageGroupStoreReaper.run() 方法包装到 TX(例如使用 <tx:advice>)。

希望我清楚。

更新

抱歉,我发现我们有一个错误。在关于 <expire-transactional> 的文档中,我们说:

Allows a transaction to be started for the forceComplete operation. It is initiated from a group-timeout(-expression) or by a MessageGroupStoreReaper and is not applied to the normal add/release/discard operations. Only this sub-element or <expire-advice-chain/> is allowed.

但事实并非如此。代码看起来像:

if (this.groupTimeoutExpression != null && !CollectionUtils.isEmpty(this.forceReleaseAdviceChain)) {
    ProxyFactory proxyFactory = new ProxyFactory(processor);
    for (Advice advice : this.forceReleaseAdviceChain) {
        proxyFactory.addAdvice(advice);
    }
    return (MessageGroupProcessor) proxyFactory.getProxy(getApplicationContext().getClassLoader());
}

因此,我们仅在提供 group-timeout(-expression) 时才应用 <expire-transactional>,但 MessageGroupStoreReaper.

并非如此

欢迎随时就此事提出 JIRA 问题,我们会尽快处理。

与此同时,您应该使用 <tx:advice> 作为 MessageGroupStoreReaper.run()