使用 spring 集成设计电子邮件每日摘要功能
Design a email daily digest feature with spring integration
我目前有一个在 JMS 队列上侦听的 SI 项目进行一些处理并根据消息发送电子邮件、写入文件...
现在我想在发送电子邮件部分添加每日摘要功能。我希望将电子邮件存储在某个地方,并且每天一次将具有相同目标电子邮件的所有邮件分组,将内容串联起来并发送电子邮件。
最好的设计解决方案是什么?
我检查了聚合器概念,但我有几个问题:
- 我需要聚合消息的持久存储。聚合消息每天只发送一次,所以我不想丢失消息。我想我应该使用 JdbcMessageStore?
- 我需要来自输入队列的事务 -> 聚合器,然后是从聚合器到电子邮件输出的另一个事务。这可能吗?如果可以,我应该如何配置?
感谢您的帮助。
干杯
编辑
我现在在这里尝试:
<int:aggregator id="templatingDailyAggregator"
input-channel="templatingDailyAggregatorInputChannel"
output-channel="templatingDailyAggregatorOutputChannel"
message-store="templatingEmailAggregatorStore"
correlation-strategy-expression="headers['templatingEmailGroupingCategory']+payload.emailMessage.email"
release-strategy-expression="false"
send-partial-result-on-expiry="true"
expire-groups-upon-completion="true"
>
<int:expire-transactional transaction-manager="templatingAggregatorStoreTransactionManager"/>
</int:aggregator>
<int-jdbc:message-store id="templatingEmailAggregatorStore" data-source="templatingEmailAggregatorStoreDataSource" />
<bean id="templatingAggregatorStoreTransactionManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="templatingEmailAggregatorStoreDataSource" />
</bean>
<!-- MySQL DB DataSource -->
<bean id="templatingEmailAggregatorStoreDataSource"
class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName" value="com.mysql.jdbc.Driver" />
<property name="url" value="MYURL" />
<property name="username" value="MYUSER" />
<property name="password" value="MYPASS" />
</bean>
<bean id="templatingEmailAggregatorStoreReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
<property name="messageGroupStore" ref="templatingEmailAggregatorStore"/>
<property name="timeout" value="10"/>
</bean>
<task:scheduled-tasks scheduler="templatingAggregatorScheduler">
<task:scheduled ref="templatingEmailAggregatorStoreReaper" method="run" cron="0 */2 * * * * "/>
</task:scheduled-tasks>
<task:scheduler id="templatingAggregatorScheduler"/>
<int:transformer id="templatingDailyDigestTransformer" ref="templatingDailyDigestTransformerBean" input-channel="templatingDailyAggregatorOutputChannel" method="processMessage" output-channel="emailOutputChannel"/>
但是有一个问题,如果我在聚合部分之后出现异常(例如在发送电子邮件期间),我希望 MessageGroup 消费回滚,因此仍然存在于数据库中。然而,这不是 MessageGroup 被消耗的情况,它不再存在于数据库中,因此丢失了。
编辑 2
我在上面的聚合器之后添加了转换器XML。目前这个转换器只是引发一个异常来测试崩溃的情况。
这是我选择的堆栈跟踪:
DEBUG [exec-1] - Expiring all messages older than timeout=10 from message group store: org.springframework.integration.jdbc.JdbcMessageStore@3e082583
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT distinct GROUP_KEY as CREATED from INT_MESSAGE_GROUP where REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT MESSAGE_ID, MESSAGE_BYTES, CREATED_DATE from INT_MESSAGE where MESSAGE_ID in (SELECT MESSAGE_ID from INT_GROUP_TO_MESSAGE where GROUP_KEY = ?) and REGION=? ORDER BY CREATED_DATE]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning BLOB as bytes
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT COMPLETE, LAST_RELEASED_SEQUENCE, CREATED_DATE, UPDATED_DATE from INT_MESSAGE_GROUP where GROUP_KEY = ? and REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT MESSAGE_ID, MESSAGE_BYTES, CREATED_DATE from INT_MESSAGE where MESSAGE_ID in (SELECT MESSAGE_ID from INT_GROUP_TO_MESSAGE where GROUP_KEY = ?) and REGION=? ORDER BY CREATED_DATE]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning BLOB as bytes
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT COMPLETE, LAST_RELEASED_SEQUENCE, CREATED_DATE, UPDATED_DATE from INT_MESSAGE_GROUP where GROUP_KEY = ? and REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning JDBC Connection to DataSource
INFO [exec-1] - Expiring MessageGroup with correlationKey[fb90fe78-c3df-3793-9ee8-acae4924bebe]
DEBUG [exec-1] - Prematurely releasing partially complete group with key [fb90fe78-c3df-3793-9ee8-acae4924bebe] to: templatingDailyAggregatorOutputChannel
DEBUG [exec-1] - Completing group with correlationKey [fb90fe78-c3df-3793-9ee8-acae4924bebe]
DEBUG [exec-1] - org.springframework.integration.transformer.MessageTransformingHandler@2fdde28c received message: GenericMessage [payload=[ch.post.ehealth.extcom.templating.core.PreAggregatorEmailMessage@563b5cce], headers={jms_timestamp=1426090799820, extcomPluginDestination=templatingPluginInputChannel, extcomId=5782d059-c88d-44ae-82a1-0b738b43e821, jms_messageId=ID:some-vm-45363-1421229178337-3:9:1:1:4, timestamp=1426090920046, id=b8330bed-fc74-c8d5-6838-a3116a05ab39, history=jmsInputAdapter,inputChannel,xmlToSpringIntTransformer,pluginRouterChannel,pluginRouter,templatingPluginInputChannel,templatingTransformer,templatingPluginOutputChannel,templatingOutputRouter,templatingEmailOutputChannel,templatingEmailGroupingRouter,templatingPreAggregatorChannel,templatingPreAggregatorTransformer,templatingDailyAggregatorInputChannel,templatingDailyAggregator,templatingDailyAggregatorOutputChannel, JdbcMessageStore.CREATED_DATE=1426090800772, jms_type=, jms_redelivered=false, priority=0, templatingEmailGrouping=DAILY, jms_correlationId=, JdbcMessageStore.SAVED=true, templatingEmailGroupingCategory=DAILY}]
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [select MESSAGE_ID, CREATED_DATE from INT_MESSAGE where MESSAGE_ID in (select MESSAGE_ID from INT_GROUP_TO_MESSAGE where GROUP_KEY=? and REGION=?) ORDER BY CREATED_DATE]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT MESSAGE_ID, CREATED_DATE, MESSAGE_BYTES from INT_MESSAGE where MESSAGE_ID=? and REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning BLOB as bytes
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL update
DEBUG [exec-1] - Executing prepared SQL statement [DELETE from INT_MESSAGE where MESSAGE_ID=? and REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - SQL update affected 1 rows
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL update
DEBUG [exec-1] - Executing prepared SQL statement [DELETE from INT_GROUP_TO_MESSAGE where GROUP_KEY=? and REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Removing relationships for the group with group key=fb90fe78-c3df-3793-9ee8-acae4924bebe
DEBUG [exec-1] - SQL update affected 1 rows
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL update
DEBUG [exec-1] - Executing prepared SQL statement [DELETE from INT_MESSAGE_GROUP where GROUP_KEY=? and REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Marking messages with group key=fb90fe78-c3df-3793-9ee8-acae4924bebe
DEBUG [exec-1] - SQL update affected 1 rows
DEBUG [exec-1] - Returning JDBC Connection to DataSource
ERROR [exec-1] - Exception in expiry callbackjava.lang.RuntimeException: test crash
at ch.post.ehealth.extcom.templating.core.DailyDigestTransformer.processMessage(DailyDigestTransformer.java:33) ~[extcom-templating.jar:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.7.0_71]
at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:63) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:95) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.expression.spel.ast.MethodReference.access[=12=]0(MethodReference.java:44) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:258) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:84) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:112) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:111) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:164) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:276) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:142) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:75) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
Wrapped by: org.springframework.messaging.MessageHandlingException: ; nested exception is java.lang.RuntimeException: test crash
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:78) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.transformer.AbstractMessageProcessingTransformer.transform(AbstractMessageProcessingTransformer.java:64) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:68) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
Wrapped by: org.springframework.integration.transformer.MessageTransformationException: ; nested exception is org.springframework.messaging.MessageHandlingException: ; nested exception is java.lang.RuntimeException: test crash
at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:74) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:277) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.1.4.RELEASE.jar:4.1.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.1.4.RELEASE.jar:4.1.4.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95) ~[spring-messaging-4.1.4.RELEASE.jar:4.1.4.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:248) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:171) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:119) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:657) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:642) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.expireGroup(AbstractCorrelatingMessageHandler.java:619) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.forceComplete(AbstractCorrelatingMessageHandler.java:543) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler$ForceReleaseMessageGroupProcessor.processMessageGroup(AbstractCorrelatingMessageHandler.java:721) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.execute(AbstractCorrelatingMessageHandler.java:168) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.store.AbstractMessageGroupStore.expire(AbstractMessageGroupStore.java:169) [spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.store.AbstractMessageGroupStore.expireMessageGroups(AbstractMessageGroupStore.java:113) [spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.store.MessageGroupStoreReaper.run(MessageGroupStoreReaper.java:115) [spring-integration-core-4.1.2.RELEASE.jar:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.7.0_71]
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:65) [spring-context-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) [spring-context-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) [spring-context-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [na:1.7.0_71]
at java.util.concurrent.FutureTask.run(Unknown Source) [na:1.7.0_71]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(Unknown Source) [na:1.7.0_71]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [na:1.7.0_71]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [na:1.7.0_71]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [na:1.7.0_71]
at java.lang.Thread.run(Unknown Source) [na:1.7.0_71]
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT MESSAGE_ID, MESSAGE_BYTES, CREATED_DATE from INT_MESSAGE where MESSAGE_ID in (SELECT MESSAGE_ID from INT_GROUP_TO_MESSAGE where GROUP_KEY = ?) and REGION=? ORDER BY CREATED_DATE]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT COMPLETE, LAST_RELEASED_SEQUENCE, CREATED_DATE, UPDATED_DATE from INT_MESSAGE_GROUP where GROUP_KEY = ? and REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Group expiry candidate (fb90fe78-c3df-3793-9ee8-acae4924bebe) has changed - it may be reconsidered for a future expiration
ERROR [exec-1] - Unexpected error occurred in scheduled task.java.lang.RuntimeException: test crash
at ch.post.ehealth.extcom.templating.core.DailyDigestTransformer.processMessage(DailyDigestTransformer.java:33) ~[extcom-templating.jar:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.7.0_71]
at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:63) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:95) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.expression.spel.ast.MethodReference.access[=12=]0(MethodReference.java:44) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:258) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:84) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:112) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:111) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:164) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:276) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:142) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:75) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
Wrapped by: org.springframework.messaging.MessageHandlingException: ; nested exception is java.lang.RuntimeException: test crash
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:78) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.transformer.AbstractMessageProcessingTransformer.transform(AbstractMessageProcessingTransformer.java:64) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:68) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
Wrapped by: org.springframework.integration.transformer.MessageTransformationException: ; nested exception is org.springframework.messaging.MessageHandlingException: ; nested exception is java.lang.RuntimeException: test crash
at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:74) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:277) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.1.4.RELEASE.jar:4.1.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.1.4.RELEASE.jar:4.1.4.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95) ~[spring-messaging-4.1.4.RELEASE.jar:4.1.4.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:248) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:171) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:119) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:657) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:642) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.expireGroup(AbstractCorrelatingMessageHandler.java:619) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.forceComplete(AbstractCorrelatingMessageHandler.java:543) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler$ForceReleaseMessageGroupProcessor.processMessageGroup(AbstractCorrelatingMessageHandler.java:721) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.execute(AbstractCorrelatingMessageHandler.java:168) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.store.AbstractMessageGroupStore.expire(AbstractMessageGroupStore.java:169) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.store.AbstractMessageGroupStore.expireMessageGroups(AbstractMessageGroupStore.java:113) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.store.MessageGroupStoreReaper.run(MessageGroupStoreReaper.java:115) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.7.0_71]
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:65) ~[spring-context-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) [spring-context-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [na:1.7.0_71]
at java.util.concurrent.FutureTask.run(Unknown Source) [na:1.7.0_71]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(Unknown Source) [na:1.7.0_71]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [na:1.7.0_71]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [na:1.7.0_71]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [na:1.7.0_71]
at java.lang.Thread.run(Unknown Source) [na:1.7.0_71]
DEBUG [exec-1] - Expiring all messages older than timeout=10 from message group store: org.springframework.integration.jdbc.JdbcMessageStore@3e082583
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT distinct GROUP_KEY as CREATED from INT_MESSAGE_GROUP where REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning JDBC Connection to DataSource
嗯,看来你走对了。
您可以将聚合器附在 TX for JMS 和 JDBC 中,并按所需 correlationKey
对消息进行分组,例如correlation-key-expression
.
由于您不想 release
分组直到某些日常事件(例如 cron trigget),您应该使用以下选项标记您的聚合器:
release-strategy-expression="false"
send-partial-result-on-expiry="true"
expire-groups-upon-completion="true"
- 第一个:不允许正常
release
- 第二个:将聚合器结果发送到
output-channel
,而不是 expire
上的 discard-channel
- 第三个:从
MessageStore
中删除组以允许为相同的 correlationKey
形成新的组
要使其正常工作,您应该配置 MessageGroupStoreReaper
:
<bean id="reaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore"/>
<property name="timeout" value="10"/>
</bean>
<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="reaper" method="run" cron="0 0 * * * * "/>
</task:scheduled-tasks>
<task:scheduler id="scheduler"/>
Reaper
调用在聚合器初始化期间注册的 MessageGroupCallback
。该回调调用聚合器上的 forceComplete
以允许使用所有提到的选项。
从 SI-4.1 开始 <aggregator>
支持 <expire-transactional>
子元素,它允许将 forceComplete
包装到 TX,就像你问的那样。
之前我们需要将 MessageGroupStoreReaper.run()
方法包装到 TX(例如使用 <tx:advice>
)。
希望我清楚。
更新
抱歉,我发现我们有一个错误。在关于 <expire-transactional>
的文档中,我们说:
Allows a transaction to be started for the forceComplete
operation. It is initiated from a group-timeout(-expression)
or by a MessageGroupStoreReaper
and is not applied to the normal add/release/discard
operations. Only this sub-element or <expire-advice-chain/>
is allowed.
但事实并非如此。代码看起来像:
if (this.groupTimeoutExpression != null && !CollectionUtils.isEmpty(this.forceReleaseAdviceChain)) {
ProxyFactory proxyFactory = new ProxyFactory(processor);
for (Advice advice : this.forceReleaseAdviceChain) {
proxyFactory.addAdvice(advice);
}
return (MessageGroupProcessor) proxyFactory.getProxy(getApplicationContext().getClassLoader());
}
因此,我们仅在提供 group-timeout(-expression)
时才应用 <expire-transactional>
,但 MessageGroupStoreReaper
.
并非如此
欢迎随时就此事提出 JIRA 问题,我们会尽快处理。
与此同时,您应该使用 <tx:advice>
作为 MessageGroupStoreReaper.run()
。
我目前有一个在 JMS 队列上侦听的 SI 项目进行一些处理并根据消息发送电子邮件、写入文件...
现在我想在发送电子邮件部分添加每日摘要功能。我希望将电子邮件存储在某个地方,并且每天一次将具有相同目标电子邮件的所有邮件分组,将内容串联起来并发送电子邮件。
最好的设计解决方案是什么? 我检查了聚合器概念,但我有几个问题:
- 我需要聚合消息的持久存储。聚合消息每天只发送一次,所以我不想丢失消息。我想我应该使用 JdbcMessageStore?
- 我需要来自输入队列的事务 -> 聚合器,然后是从聚合器到电子邮件输出的另一个事务。这可能吗?如果可以,我应该如何配置?
感谢您的帮助。
干杯
编辑
我现在在这里尝试:
<int:aggregator id="templatingDailyAggregator"
input-channel="templatingDailyAggregatorInputChannel"
output-channel="templatingDailyAggregatorOutputChannel"
message-store="templatingEmailAggregatorStore"
correlation-strategy-expression="headers['templatingEmailGroupingCategory']+payload.emailMessage.email"
release-strategy-expression="false"
send-partial-result-on-expiry="true"
expire-groups-upon-completion="true"
>
<int:expire-transactional transaction-manager="templatingAggregatorStoreTransactionManager"/>
</int:aggregator>
<int-jdbc:message-store id="templatingEmailAggregatorStore" data-source="templatingEmailAggregatorStoreDataSource" />
<bean id="templatingAggregatorStoreTransactionManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="templatingEmailAggregatorStoreDataSource" />
</bean>
<!-- MySQL DB DataSource -->
<bean id="templatingEmailAggregatorStoreDataSource"
class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName" value="com.mysql.jdbc.Driver" />
<property name="url" value="MYURL" />
<property name="username" value="MYUSER" />
<property name="password" value="MYPASS" />
</bean>
<bean id="templatingEmailAggregatorStoreReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
<property name="messageGroupStore" ref="templatingEmailAggregatorStore"/>
<property name="timeout" value="10"/>
</bean>
<task:scheduled-tasks scheduler="templatingAggregatorScheduler">
<task:scheduled ref="templatingEmailAggregatorStoreReaper" method="run" cron="0 */2 * * * * "/>
</task:scheduled-tasks>
<task:scheduler id="templatingAggregatorScheduler"/>
<int:transformer id="templatingDailyDigestTransformer" ref="templatingDailyDigestTransformerBean" input-channel="templatingDailyAggregatorOutputChannel" method="processMessage" output-channel="emailOutputChannel"/>
但是有一个问题,如果我在聚合部分之后出现异常(例如在发送电子邮件期间),我希望 MessageGroup 消费回滚,因此仍然存在于数据库中。然而,这不是 MessageGroup 被消耗的情况,它不再存在于数据库中,因此丢失了。
编辑 2
我在上面的聚合器之后添加了转换器XML。目前这个转换器只是引发一个异常来测试崩溃的情况。 这是我选择的堆栈跟踪:
DEBUG [exec-1] - Expiring all messages older than timeout=10 from message group store: org.springframework.integration.jdbc.JdbcMessageStore@3e082583
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT distinct GROUP_KEY as CREATED from INT_MESSAGE_GROUP where REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT MESSAGE_ID, MESSAGE_BYTES, CREATED_DATE from INT_MESSAGE where MESSAGE_ID in (SELECT MESSAGE_ID from INT_GROUP_TO_MESSAGE where GROUP_KEY = ?) and REGION=? ORDER BY CREATED_DATE]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning BLOB as bytes
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT COMPLETE, LAST_RELEASED_SEQUENCE, CREATED_DATE, UPDATED_DATE from INT_MESSAGE_GROUP where GROUP_KEY = ? and REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT MESSAGE_ID, MESSAGE_BYTES, CREATED_DATE from INT_MESSAGE where MESSAGE_ID in (SELECT MESSAGE_ID from INT_GROUP_TO_MESSAGE where GROUP_KEY = ?) and REGION=? ORDER BY CREATED_DATE]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning BLOB as bytes
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT COMPLETE, LAST_RELEASED_SEQUENCE, CREATED_DATE, UPDATED_DATE from INT_MESSAGE_GROUP where GROUP_KEY = ? and REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning JDBC Connection to DataSource
INFO [exec-1] - Expiring MessageGroup with correlationKey[fb90fe78-c3df-3793-9ee8-acae4924bebe]
DEBUG [exec-1] - Prematurely releasing partially complete group with key [fb90fe78-c3df-3793-9ee8-acae4924bebe] to: templatingDailyAggregatorOutputChannel
DEBUG [exec-1] - Completing group with correlationKey [fb90fe78-c3df-3793-9ee8-acae4924bebe]
DEBUG [exec-1] - org.springframework.integration.transformer.MessageTransformingHandler@2fdde28c received message: GenericMessage [payload=[ch.post.ehealth.extcom.templating.core.PreAggregatorEmailMessage@563b5cce], headers={jms_timestamp=1426090799820, extcomPluginDestination=templatingPluginInputChannel, extcomId=5782d059-c88d-44ae-82a1-0b738b43e821, jms_messageId=ID:some-vm-45363-1421229178337-3:9:1:1:4, timestamp=1426090920046, id=b8330bed-fc74-c8d5-6838-a3116a05ab39, history=jmsInputAdapter,inputChannel,xmlToSpringIntTransformer,pluginRouterChannel,pluginRouter,templatingPluginInputChannel,templatingTransformer,templatingPluginOutputChannel,templatingOutputRouter,templatingEmailOutputChannel,templatingEmailGroupingRouter,templatingPreAggregatorChannel,templatingPreAggregatorTransformer,templatingDailyAggregatorInputChannel,templatingDailyAggregator,templatingDailyAggregatorOutputChannel, JdbcMessageStore.CREATED_DATE=1426090800772, jms_type=, jms_redelivered=false, priority=0, templatingEmailGrouping=DAILY, jms_correlationId=, JdbcMessageStore.SAVED=true, templatingEmailGroupingCategory=DAILY}]
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [select MESSAGE_ID, CREATED_DATE from INT_MESSAGE where MESSAGE_ID in (select MESSAGE_ID from INT_GROUP_TO_MESSAGE where GROUP_KEY=? and REGION=?) ORDER BY CREATED_DATE]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT MESSAGE_ID, CREATED_DATE, MESSAGE_BYTES from INT_MESSAGE where MESSAGE_ID=? and REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning BLOB as bytes
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL update
DEBUG [exec-1] - Executing prepared SQL statement [DELETE from INT_MESSAGE where MESSAGE_ID=? and REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - SQL update affected 1 rows
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL update
DEBUG [exec-1] - Executing prepared SQL statement [DELETE from INT_GROUP_TO_MESSAGE where GROUP_KEY=? and REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Removing relationships for the group with group key=fb90fe78-c3df-3793-9ee8-acae4924bebe
DEBUG [exec-1] - SQL update affected 1 rows
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL update
DEBUG [exec-1] - Executing prepared SQL statement [DELETE from INT_MESSAGE_GROUP where GROUP_KEY=? and REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Marking messages with group key=fb90fe78-c3df-3793-9ee8-acae4924bebe
DEBUG [exec-1] - SQL update affected 1 rows
DEBUG [exec-1] - Returning JDBC Connection to DataSource
ERROR [exec-1] - Exception in expiry callbackjava.lang.RuntimeException: test crash
at ch.post.ehealth.extcom.templating.core.DailyDigestTransformer.processMessage(DailyDigestTransformer.java:33) ~[extcom-templating.jar:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.7.0_71]
at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:63) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:95) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.expression.spel.ast.MethodReference.access[=12=]0(MethodReference.java:44) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:258) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:84) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:112) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:111) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:164) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:276) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:142) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:75) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
Wrapped by: org.springframework.messaging.MessageHandlingException: ; nested exception is java.lang.RuntimeException: test crash
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:78) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.transformer.AbstractMessageProcessingTransformer.transform(AbstractMessageProcessingTransformer.java:64) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:68) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
Wrapped by: org.springframework.integration.transformer.MessageTransformationException: ; nested exception is org.springframework.messaging.MessageHandlingException: ; nested exception is java.lang.RuntimeException: test crash
at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:74) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:277) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.1.4.RELEASE.jar:4.1.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.1.4.RELEASE.jar:4.1.4.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95) ~[spring-messaging-4.1.4.RELEASE.jar:4.1.4.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:248) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:171) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:119) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:657) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:642) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.expireGroup(AbstractCorrelatingMessageHandler.java:619) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.forceComplete(AbstractCorrelatingMessageHandler.java:543) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler$ForceReleaseMessageGroupProcessor.processMessageGroup(AbstractCorrelatingMessageHandler.java:721) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.execute(AbstractCorrelatingMessageHandler.java:168) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.store.AbstractMessageGroupStore.expire(AbstractMessageGroupStore.java:169) [spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.store.AbstractMessageGroupStore.expireMessageGroups(AbstractMessageGroupStore.java:113) [spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.store.MessageGroupStoreReaper.run(MessageGroupStoreReaper.java:115) [spring-integration-core-4.1.2.RELEASE.jar:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.7.0_71]
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:65) [spring-context-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) [spring-context-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) [spring-context-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [na:1.7.0_71]
at java.util.concurrent.FutureTask.run(Unknown Source) [na:1.7.0_71]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(Unknown Source) [na:1.7.0_71]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [na:1.7.0_71]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [na:1.7.0_71]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [na:1.7.0_71]
at java.lang.Thread.run(Unknown Source) [na:1.7.0_71]
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT MESSAGE_ID, MESSAGE_BYTES, CREATED_DATE from INT_MESSAGE where MESSAGE_ID in (SELECT MESSAGE_ID from INT_GROUP_TO_MESSAGE where GROUP_KEY = ?) and REGION=? ORDER BY CREATED_DATE]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT COMPLETE, LAST_RELEASED_SEQUENCE, CREATED_DATE, UPDATED_DATE from INT_MESSAGE_GROUP where GROUP_KEY = ? and REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Group expiry candidate (fb90fe78-c3df-3793-9ee8-acae4924bebe) has changed - it may be reconsidered for a future expiration
ERROR [exec-1] - Unexpected error occurred in scheduled task.java.lang.RuntimeException: test crash
at ch.post.ehealth.extcom.templating.core.DailyDigestTransformer.processMessage(DailyDigestTransformer.java:33) ~[extcom-templating.jar:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.7.0_71]
at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:63) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:95) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.expression.spel.ast.MethodReference.access[=12=]0(MethodReference.java:44) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:258) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:84) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:112) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:111) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:164) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:276) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:142) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:75) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
Wrapped by: org.springframework.messaging.MessageHandlingException: ; nested exception is java.lang.RuntimeException: test crash
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:78) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.transformer.AbstractMessageProcessingTransformer.transform(AbstractMessageProcessingTransformer.java:64) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:68) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
Wrapped by: org.springframework.integration.transformer.MessageTransformationException: ; nested exception is org.springframework.messaging.MessageHandlingException: ; nested exception is java.lang.RuntimeException: test crash
at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:74) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:277) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.1.4.RELEASE.jar:4.1.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.1.4.RELEASE.jar:4.1.4.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95) ~[spring-messaging-4.1.4.RELEASE.jar:4.1.4.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:248) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:171) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:119) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:657) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:642) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.expireGroup(AbstractCorrelatingMessageHandler.java:619) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.forceComplete(AbstractCorrelatingMessageHandler.java:543) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler$ForceReleaseMessageGroupProcessor.processMessageGroup(AbstractCorrelatingMessageHandler.java:721) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.execute(AbstractCorrelatingMessageHandler.java:168) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.store.AbstractMessageGroupStore.expire(AbstractMessageGroupStore.java:169) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.store.AbstractMessageGroupStore.expireMessageGroups(AbstractMessageGroupStore.java:113) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at org.springframework.integration.store.MessageGroupStoreReaper.run(MessageGroupStoreReaper.java:115) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.7.0_71]
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:65) ~[spring-context-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) [spring-context-4.0.8.RELEASE.jar:4.0.8.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [na:1.7.0_71]
at java.util.concurrent.FutureTask.run(Unknown Source) [na:1.7.0_71]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(Unknown Source) [na:1.7.0_71]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [na:1.7.0_71]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [na:1.7.0_71]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [na:1.7.0_71]
at java.lang.Thread.run(Unknown Source) [na:1.7.0_71]
DEBUG [exec-1] - Expiring all messages older than timeout=10 from message group store: org.springframework.integration.jdbc.JdbcMessageStore@3e082583
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT distinct GROUP_KEY as CREATED from INT_MESSAGE_GROUP where REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning JDBC Connection to DataSource
嗯,看来你走对了。
您可以将聚合器附在 TX for JMS 和 JDBC 中,并按所需 correlationKey
对消息进行分组,例如correlation-key-expression
.
由于您不想 release
分组直到某些日常事件(例如 cron trigget),您应该使用以下选项标记您的聚合器:
release-strategy-expression="false"
send-partial-result-on-expiry="true"
expire-groups-upon-completion="true"
- 第一个:不允许正常
release
- 第二个:将聚合器结果发送到
output-channel
,而不是expire
上的 - 第三个:从
MessageStore
中删除组以允许为相同的correlationKey
形成新的组
discard-channel
要使其正常工作,您应该配置 MessageGroupStoreReaper
:
<bean id="reaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore"/>
<property name="timeout" value="10"/>
</bean>
<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="reaper" method="run" cron="0 0 * * * * "/>
</task:scheduled-tasks>
<task:scheduler id="scheduler"/>
Reaper
调用在聚合器初始化期间注册的 MessageGroupCallback
。该回调调用聚合器上的 forceComplete
以允许使用所有提到的选项。
从 SI-4.1 开始 <aggregator>
支持 <expire-transactional>
子元素,它允许将 forceComplete
包装到 TX,就像你问的那样。
之前我们需要将 MessageGroupStoreReaper.run()
方法包装到 TX(例如使用 <tx:advice>
)。
希望我清楚。
更新
抱歉,我发现我们有一个错误。在关于 <expire-transactional>
的文档中,我们说:
Allows a transaction to be started for the
forceComplete
operation. It is initiated from agroup-timeout(-expression)
or by aMessageGroupStoreReaper
and is not applied to the normaladd/release/discard
operations. Only this sub-element or<expire-advice-chain/>
is allowed.
但事实并非如此。代码看起来像:
if (this.groupTimeoutExpression != null && !CollectionUtils.isEmpty(this.forceReleaseAdviceChain)) {
ProxyFactory proxyFactory = new ProxyFactory(processor);
for (Advice advice : this.forceReleaseAdviceChain) {
proxyFactory.addAdvice(advice);
}
return (MessageGroupProcessor) proxyFactory.getProxy(getApplicationContext().getClassLoader());
}
因此,我们仅在提供 group-timeout(-expression)
时才应用 <expire-transactional>
,但 MessageGroupStoreReaper
.
欢迎随时就此事提出 JIRA 问题,我们会尽快处理。
与此同时,您应该使用 <tx:advice>
作为 MessageGroupStoreReaper.run()
。