Spring 集成 - 计划删除消息组
Spring integration - scheduled remove message groups
我使用 Spring 集成聚合器模式并存储在 JdbcMessageStore
。
我知道 .expireGroupsUponCompletion(true)
和 .expireGroupsUponTimeout(true)
从商店中删除了一个组。
我想知道这两个属性是否都是 false
,如何在预定时间(例如一天结束时)另外删除组。可能吗?
更新
聚合器
@Bean
public IntegrationFlow aggregatorSession(MessageGroupStore jdbcMessageStoreSession) {
return IntegrationFlows.from(aggregatorSessionChannel())
.aggregate(g -> g.poller(Pollers.fixedDelay(1000).errorChannel("error.input"))
.transactional(true)
.discardChannel("discardSession.input")
.sendPartialResultOnExpiry(true)
.expireGroupsUponCompletion(false)
.expireGroupsUponTimeout(false)
.messageStore(jdbcMessageStoreSession)
.correlationExpression("payload.item.ticket")
.releaseStrategy(r -> r.size() == 2)
.outputProcessor(outputSessionProcessor()))
...
}
配置
@Bean
public MessageGroupStore jdbcMessageStoreSession(DataSource dataSource) {
JdbcMessageStore jdbcMessageStore = new JdbcMessageStore(dataSource);
jdbcMessageStore.setRegion("session");
return jdbcMessageStore;
}
@Bean
public MessageGroupStoreReaper messageGroupStoreSessionReaper(MessageGroupStore jdbcMessageStoreSession) {
MessageGroupStoreReaper messageGroupStoreReaper = new MessageGroupStoreReaper();
messageGroupStoreReaper.setTimeout(3000);
messageGroupStoreReaper.setMessageGroupStore(jdbcMessageStoreSession);
return messageGroupStoreReaper;
}
计划任务
@Service
@RequiredArgsConstructor
public class SchedulerReaperService {
private final MessageGroupStoreReaper messageGroupStoreSessionReaper;
@Scheduled(fixedRate = 2000, initialDelay = 1000)
@Transactional
public void poll() {
messageGroupStoreSessionReaper.run();
}
}
调试日志
2022-04-04 12:07:38.377 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler : bean 'aggregatorSession.aggregator#0' for component 'aggregatorSession.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [sk/vub/quatro/integration/AmqpIntegration.class]'; from source: 'bean method aggregatorSession' received message: GenericMessage [payload=AggregatorWrap(item=DocumentDTO(ticket=d0643e59-4f37-4df6-8548-1be388cc895d, state=OPEN, signingSessionId=null, documentId=null, type=null, businessKey=85a6a6fd-f1fb-4599-9b0c-ea007f6f9f29, partyId=null), order=1), headers={nativeHeaders={}, id=b3114fd5-cb20-bf6f-0d02-1418660bbd41}]
2022-04-04 12:07:38.378 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler : Handling message with correlationKey [d0643e59-4f37-4df6-8548-1be388cc895d]: GenericMessage [payload=AggregatorWrap(item=DocumentDTO(ticket=d0643e59-4f37-4df6-8548-1be388cc895d, state=OPEN, signingSessionId=null, documentId=null, type=null, businessKey=85a6a6fd-f1fb-4599-9b0c-ea007f6f9f29, partyId=null), order=1), headers={nativeHeaders={}, id=b3114fd5-cb20-bf6f-0d02-1418660bbd41}]
2022-04-04 12:07:38.382 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler : bean 'aggregatorSession.aggregator#0' for component 'aggregatorSession.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [sk/vub/quatro/integration/AmqpIntegration.class]'; from source: 'bean method aggregatorSession' received message: GenericMessage [payload=AggregatorWrap(item=DocumentDTO(ticket=d0643e59-4f37-4df6-8548-1be388cc895d, state=OPEN, signingSessionId=e361ecee-3211-4f1d-b2de-fe002effa3fb, documentId=1, type=CONTRACT, businessKey=85a6a6fd-f1fb-4599-9b0c-ea007f6f9f29, partyId=284680~^~CIS), order=0), headers={nativeHeaders={}, documentType=CONTRACT, amqp_replyTo=event-exchange/event.response, amqp_correlationId=7ed32e1a-9887-4e0c-b9b0-0a12da6452f9, id=8521c14a-7a19-5b2c-3942-5e85bae0bd0e}]
2022-04-04 12:07:38.383 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler : Handling message with correlationKey [d0643e59-4f37-4df6-8548-1be388cc895d]: GenericMessage [payload=AggregatorWrap(item=DocumentDTO(ticket=d0643e59-4f37-4df6-8548-1be388cc895d, state=OPEN, signingSessionId=e361ecee-3211-4f1d-b2de-fe002effa3fb, documentId=1, type=CONTRACT, businessKey=85a6a6fd-f1fb-4599-9b0c-ea007f6f9f29, partyId=284680~^~CIS), order=0), headers={nativeHeaders={}, documentType=CONTRACT, amqp_replyTo=event-exchange/event.response, amqp_correlationId=7ed32e1a-9887-4e0c-b9b0-0a12da6452f9, id=8521c14a-7a19-5b2c-3942-5e85bae0bd0e}]
2022-04-04 12:07:38.385 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler : Completing group with correlationKey [d0643e59-4f37-4df6-8548-1be388cc895d]
2022-04-04 12:07:41.455 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler : Group expiry candidate (d0643e59-4f37-4df6-8548-1be388cc895d) has changed - it may be reconsidered for a future expiration
2022-04-04 12:07:43.477 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler : Group expiry candidate (d0643e59-4f37-4df6-8548-1be388cc895d) has changed - it may be reconsidered for a future expiration
2022-04-04 12:07:45.497 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler : Group expiry candidate (d0643e59-4f37-4df6-8548-1be388cc895d) has changed - it may be reconsidered for a future expiration
2022-04-04 12:07:47.517 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler : Group expiry candidate (d0643e59-4f37-4df6-8548-1be388cc895d) has changed - it may be reconsidered for a future expiration
请参阅 MessageGroupStoreReaper
以及如何为计划任务配置它,例如通过 cron - 在一天结束时:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator.
XML中有样例,但Java配置相差不大:
<bean id="reaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore"/>
<property name="timeout" value="30000"/>
</bean>
<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>
参见 @EnableScheduling
和 @Scheduled
Java。
更新
聚合器那边的逻辑是这样的:
if (groupSize > 0) {
noOutput = false;
if (this.releaseStrategy.canRelease(groupNow)) {
completeGroup(correlationKey, groupNow, lock);
}
else {
expireGroup(correlationKey, groupNow, lock);
}
if (!this.expireGroupsUponTimeout) {
afterRelease(groupNow, groupNow.getMessages(), true);
removeGroup = false;
}
}
因此,如果您的群不为空(例如1条消息),则无法正常发布。但它已过期,然后刚刚发布的消息从组中删除。下一次reaper工作时会进入下一个状态:
/*
* By default empty groups are removed on the same schedule as non-empty
* groups. A longer timeout for empty groups can be enabled by
* setting minimumTimeoutForEmptyGroups.
*/
removeGroup =
lastModifiedNow <= (System.currentTimeMillis() - this.minimumTimeoutForEmptyGroups);
现在删除了空组。
请在调试模式下修改您的 reaperTimeout
确实足以确定您的组已经足够老,可以在第一个和下一个收割者周期中到期。
更新 2
这是一个简单的工作示例:
@SpringBootApplication
@EnableScheduling
public class So71617406Application {
public static void main(String[] args) {
SpringApplication.run(So71617406Application.class, args);
}
@Bean
public MessageGroupStore jdbcMessageStoreSession(DataSource dataSource) {
JdbcMessageStore jdbcMessageStore = new JdbcMessageStore(dataSource);
jdbcMessageStore.setRegion("session");
return jdbcMessageStore;
}
@Bean
public MessageGroupStoreReaper messageGroupStoreSessionReaper(MessageGroupStore jdbcMessageStoreSession) {
MessageGroupStoreReaper messageGroupStoreReaper = new MessageGroupStoreReaper();
messageGroupStoreReaper.setTimeout(3000);
messageGroupStoreReaper.setMessageGroupStore(jdbcMessageStoreSession);
return messageGroupStoreReaper;
}
@Autowired
@Lazy
private MessageGroupStoreReaper messageGroupStoreSessionReaper;
@Scheduled(fixedRate = 2000, initialDelay = 1000)
@Transactional
public void poll() {
this.messageGroupStoreSessionReaper.run();
}
@Bean
public IntegrationFlow aggregatorSession(MessageGroupStore jdbcMessageStoreSession) {
return IntegrationFlows.from("aggregatorSessionChannel")
.aggregate(g -> g
.transactional(true)
.sendPartialResultOnExpiry(true)
.expireGroupsUponCompletion(false)
.expireGroupsUponTimeout(false)
.messageStore(jdbcMessageStoreSession)
.correlationExpression("1")
.releaseStrategy(r -> r.size() == 2))
.nullChannel();
}
@Bean
ApplicationRunner applicationRunner(MessageChannel aggregatorSessionChannel) {
return args -> {
aggregatorSessionChannel.send(new GenericMessage<>("test1"));
aggregatorSessionChannel.send(new GenericMessage<>("test2"));
};
}
}
它的日志看起来像:
2022-04-04 11:10:40.149 DEBUG 2760 --- [ main] o.s.i.a.AggregatingMessageHandler : bean 'aggregatorSession.aggregator#0' for component 'aggregatorSession.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'org.springframework.integration.Whosebug.so71617406.So71617406Application'; from source: 'bean method aggregatorSession' received message: GenericMessage [payload=test1, headers={id=a98e8c8b-be9e-ba5d-d996-e6bb6513afe4, timestamp=1649085040146}]
2022-04-04 11:10:40.150 DEBUG 2760 --- [ main] o.s.i.a.AggregatingMessageHandler : Handling message with correlationKey [1]: GenericMessage [payload=test1, headers={id=a98e8c8b-be9e-ba5d-d996-e6bb6513afe4, timestamp=1649085040146}]
2022-04-04 11:10:40.166 DEBUG 2760 --- [ main] o.s.i.jdbc.store.JdbcMessageStore : Inserting message with id key=a98e8c8b-be9e-ba5d-d996-e6bb6513afe4
2022-04-04 11:10:40.174 DEBUG 2760 --- [ main] o.s.i.jdbc.store.JdbcMessageStore : Inserting message with id key=a98e8c8b-be9e-ba5d-d996-e6bb6513afe4 and created date=2022-04-04 11:10:40.164
2022-04-04 11:10:40.175 DEBUG 2760 --- [ main] o.s.i.jdbc.store.JdbcMessageStore : Creating message group with id key=c4ca4238-a0b9-3382-8dcc-509a6f75849b and created date=2022-04-04 11:10:40.164
2022-04-04 11:10:40.176 DEBUG 2760 --- [ main] o.s.i.store.PersistentMessageGroup : Lazy loading of group size for messageGroup: 1
2022-04-04 11:10:40.183 DEBUG 2760 --- [ main] o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'aggregatorSessionChannel'; defined in: 'org.springframework.integration.Whosebug.so71617406.So71617406Application'; from source: 'bean method aggregatorSession'', message: GenericMessage [payload=test1, headers={id=a98e8c8b-be9e-ba5d-d996-e6bb6513afe4, timestamp=1649085040146}]
2022-04-04 11:10:40.183 DEBUG 2760 --- [ main] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'aggregatorSessionChannel'; defined in: 'org.springframework.integration.Whosebug.so71617406.So71617406Application'; from source: 'bean method aggregatorSession'', message: GenericMessage [payload=test2, headers={id=64f3d9be-30c7-1ff3-eb36-3d821f0c76fd, timestamp=1649085040183}]
2022-04-04 11:10:40.183 DEBUG 2760 --- [ main] o.s.i.a.AggregatingMessageHandler : bean 'aggregatorSession.aggregator#0' for component 'aggregatorSession.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'org.springframework.integration.Whosebug.so71617406.So71617406Application'; from source: 'bean method aggregatorSession' received message: GenericMessage [payload=test2, headers={id=64f3d9be-30c7-1ff3-eb36-3d821f0c76fd, timestamp=1649085040183}]
2022-04-04 11:10:40.183 DEBUG 2760 --- [ main] o.s.i.a.AggregatingMessageHandler : Handling message with correlationKey [1]: GenericMessage [payload=test2, headers={id=64f3d9be-30c7-1ff3-eb36-3d821f0c76fd, timestamp=1649085040183}]
2022-04-04 11:10:40.184 DEBUG 2760 --- [ main] o.s.i.jdbc.store.JdbcMessageStore : Inserting message with id key=64f3d9be-30c7-1ff3-eb36-3d821f0c76fd
2022-04-04 11:10:40.185 DEBUG 2760 --- [ main] o.s.i.jdbc.store.JdbcMessageStore : Inserting message with id key=64f3d9be-30c7-1ff3-eb36-3d821f0c76fd and created date=2022-04-04 11:10:40.164
2022-04-04 11:10:40.185 DEBUG 2760 --- [ main] o.s.i.jdbc.store.JdbcMessageStore : Updating MessageGroup: c4ca4238-a0b9-3382-8dcc-509a6f75849b
2022-04-04 11:10:40.186 DEBUG 2760 --- [ main] o.s.i.store.PersistentMessageGroup : Lazy loading of group size for messageGroup: 1
2022-04-04 11:10:40.187 DEBUG 2760 --- [ main] o.s.i.a.AggregatingMessageHandler : Completing group with correlationKey [1]
2022-04-04 11:10:40.187 DEBUG 2760 --- [ main] o.s.i.store.PersistentMessageGroup : Lazy loading of messages for messageGroup: 1
2022-04-04 11:10:40.191 DEBUG 2760 --- [ main] o.s.integration.channel.NullChannel : message sent to null channel: GenericMessage [payload=[test1, test2], headers={id=5442d9a6-3549-041c-7f49-557b3df62749, timestamp=1649085040191}]
2022-04-04 11:10:40.191 DEBUG 2760 --- [ main] o.s.i.jdbc.store.JdbcMessageStore : Completing MessageGroup: c4ca4238-a0b9-3382-8dcc-509a6f75849b
2022-04-04 11:10:40.191 DEBUG 2760 --- [ main] o.s.i.jdbc.store.JdbcMessageStore : Removing messages from group with group key=c4ca4238-a0b9-3382-8dcc-509a6f75849b
2022-04-04 11:10:40.193 DEBUG 2760 --- [ main] o.s.i.jdbc.store.JdbcMessageStore : Updating MessageGroup: c4ca4238-a0b9-3382-8dcc-509a6f75849b
2022-04-04 11:10:40.193 DEBUG 2760 --- [ main] o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'aggregatorSessionChannel'; defined in: 'org.springframework.integration.Whosebug.so71617406.So71617406Application'; from source: 'bean method aggregatorSession'', message: GenericMessage [payload=test2, headers={id=64f3d9be-30c7-1ff3-eb36-3d821f0c76fd, timestamp=1649085040183}]
2022-04-04 11:10:41.159 DEBUG 2760 --- [ scheduling-1] o.s.i.store.MessageGroupStoreReaper : Expiring all messages older than timeout=3000 from message group store: org.springframework.integration.jdbc.store.JdbcMessageStore@31464a43
2022-04-04 11:10:43.156 DEBUG 2760 --- [ scheduling-1] o.s.i.store.MessageGroupStoreReaper : Expiring all messages older than timeout=3000 from message group store: org.springframework.integration.jdbc.store.JdbcMessageStore@31464a43
2022-04-04 11:10:45.147 DEBUG 2760 --- [ scheduling-1] o.s.i.store.MessageGroupStoreReaper : Expiring all messages older than timeout=3000 from message group store: org.springframework.integration.jdbc.store.JdbcMessageStore@31464a43
2022-04-04 11:10:45.148 DEBUG 2760 --- [ scheduling-1] o.s.i.store.PersistentMessageGroup : Lazy loading of group size for messageGroup: c4ca4238-a0b9-3382-8dcc-509a6f75849b
2022-04-04 11:10:45.149 DEBUG 2760 --- [ scheduling-1] o.s.i.a.AggregatingMessageHandler : Removing empty group: c4ca4238-a0b9-3382-8dcc-509a6f75849b
2022-04-04 11:10:45.149 DEBUG 2760 --- [ scheduling-1] o.s.i.jdbc.store.JdbcMessageStore : Removing relationships for the group with group key=c4ca4238-a0b9-3382-8dcc-509a6f75849b
2022-04-04 11:10:45.150 DEBUG 2760 --- [ scheduling-1] o.s.i.jdbc.store.JdbcMessageStore : Deleting messages with group key=c4ca4238-a0b9-3382-8dcc-509a6f75849b
关注Removing empty group:
。因此,在该组完成后,由于某种原因,您的组似乎不是空的。也许您的 outputSessionProcessor
犯了一些错误,因此它会抛出错误,因此由于事务回滚,完成的消息不会从数据库中删除...
我使用 Spring 集成聚合器模式并存储在 JdbcMessageStore
。
我知道 .expireGroupsUponCompletion(true)
和 .expireGroupsUponTimeout(true)
从商店中删除了一个组。
我想知道这两个属性是否都是 false
,如何在预定时间(例如一天结束时)另外删除组。可能吗?
更新
聚合器
@Bean
public IntegrationFlow aggregatorSession(MessageGroupStore jdbcMessageStoreSession) {
return IntegrationFlows.from(aggregatorSessionChannel())
.aggregate(g -> g.poller(Pollers.fixedDelay(1000).errorChannel("error.input"))
.transactional(true)
.discardChannel("discardSession.input")
.sendPartialResultOnExpiry(true)
.expireGroupsUponCompletion(false)
.expireGroupsUponTimeout(false)
.messageStore(jdbcMessageStoreSession)
.correlationExpression("payload.item.ticket")
.releaseStrategy(r -> r.size() == 2)
.outputProcessor(outputSessionProcessor()))
...
}
配置
@Bean
public MessageGroupStore jdbcMessageStoreSession(DataSource dataSource) {
JdbcMessageStore jdbcMessageStore = new JdbcMessageStore(dataSource);
jdbcMessageStore.setRegion("session");
return jdbcMessageStore;
}
@Bean
public MessageGroupStoreReaper messageGroupStoreSessionReaper(MessageGroupStore jdbcMessageStoreSession) {
MessageGroupStoreReaper messageGroupStoreReaper = new MessageGroupStoreReaper();
messageGroupStoreReaper.setTimeout(3000);
messageGroupStoreReaper.setMessageGroupStore(jdbcMessageStoreSession);
return messageGroupStoreReaper;
}
计划任务
@Service
@RequiredArgsConstructor
public class SchedulerReaperService {
private final MessageGroupStoreReaper messageGroupStoreSessionReaper;
@Scheduled(fixedRate = 2000, initialDelay = 1000)
@Transactional
public void poll() {
messageGroupStoreSessionReaper.run();
}
}
调试日志
2022-04-04 12:07:38.377 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler : bean 'aggregatorSession.aggregator#0' for component 'aggregatorSession.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [sk/vub/quatro/integration/AmqpIntegration.class]'; from source: 'bean method aggregatorSession' received message: GenericMessage [payload=AggregatorWrap(item=DocumentDTO(ticket=d0643e59-4f37-4df6-8548-1be388cc895d, state=OPEN, signingSessionId=null, documentId=null, type=null, businessKey=85a6a6fd-f1fb-4599-9b0c-ea007f6f9f29, partyId=null), order=1), headers={nativeHeaders={}, id=b3114fd5-cb20-bf6f-0d02-1418660bbd41}]
2022-04-04 12:07:38.378 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler : Handling message with correlationKey [d0643e59-4f37-4df6-8548-1be388cc895d]: GenericMessage [payload=AggregatorWrap(item=DocumentDTO(ticket=d0643e59-4f37-4df6-8548-1be388cc895d, state=OPEN, signingSessionId=null, documentId=null, type=null, businessKey=85a6a6fd-f1fb-4599-9b0c-ea007f6f9f29, partyId=null), order=1), headers={nativeHeaders={}, id=b3114fd5-cb20-bf6f-0d02-1418660bbd41}]
2022-04-04 12:07:38.382 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler : bean 'aggregatorSession.aggregator#0' for component 'aggregatorSession.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [sk/vub/quatro/integration/AmqpIntegration.class]'; from source: 'bean method aggregatorSession' received message: GenericMessage [payload=AggregatorWrap(item=DocumentDTO(ticket=d0643e59-4f37-4df6-8548-1be388cc895d, state=OPEN, signingSessionId=e361ecee-3211-4f1d-b2de-fe002effa3fb, documentId=1, type=CONTRACT, businessKey=85a6a6fd-f1fb-4599-9b0c-ea007f6f9f29, partyId=284680~^~CIS), order=0), headers={nativeHeaders={}, documentType=CONTRACT, amqp_replyTo=event-exchange/event.response, amqp_correlationId=7ed32e1a-9887-4e0c-b9b0-0a12da6452f9, id=8521c14a-7a19-5b2c-3942-5e85bae0bd0e}]
2022-04-04 12:07:38.383 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler : Handling message with correlationKey [d0643e59-4f37-4df6-8548-1be388cc895d]: GenericMessage [payload=AggregatorWrap(item=DocumentDTO(ticket=d0643e59-4f37-4df6-8548-1be388cc895d, state=OPEN, signingSessionId=e361ecee-3211-4f1d-b2de-fe002effa3fb, documentId=1, type=CONTRACT, businessKey=85a6a6fd-f1fb-4599-9b0c-ea007f6f9f29, partyId=284680~^~CIS), order=0), headers={nativeHeaders={}, documentType=CONTRACT, amqp_replyTo=event-exchange/event.response, amqp_correlationId=7ed32e1a-9887-4e0c-b9b0-0a12da6452f9, id=8521c14a-7a19-5b2c-3942-5e85bae0bd0e}]
2022-04-04 12:07:38.385 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler : Completing group with correlationKey [d0643e59-4f37-4df6-8548-1be388cc895d]
2022-04-04 12:07:41.455 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler : Group expiry candidate (d0643e59-4f37-4df6-8548-1be388cc895d) has changed - it may be reconsidered for a future expiration
2022-04-04 12:07:43.477 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler : Group expiry candidate (d0643e59-4f37-4df6-8548-1be388cc895d) has changed - it may be reconsidered for a future expiration
2022-04-04 12:07:45.497 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler : Group expiry candidate (d0643e59-4f37-4df6-8548-1be388cc895d) has changed - it may be reconsidered for a future expiration
2022-04-04 12:07:47.517 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler : Group expiry candidate (d0643e59-4f37-4df6-8548-1be388cc895d) has changed - it may be reconsidered for a future expiration
请参阅 MessageGroupStoreReaper
以及如何为计划任务配置它,例如通过 cron - 在一天结束时:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator.
XML中有样例,但Java配置相差不大:
<bean id="reaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore"/>
<property name="timeout" value="30000"/>
</bean>
<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>
参见 @EnableScheduling
和 @Scheduled
Java。
更新
聚合器那边的逻辑是这样的:
if (groupSize > 0) {
noOutput = false;
if (this.releaseStrategy.canRelease(groupNow)) {
completeGroup(correlationKey, groupNow, lock);
}
else {
expireGroup(correlationKey, groupNow, lock);
}
if (!this.expireGroupsUponTimeout) {
afterRelease(groupNow, groupNow.getMessages(), true);
removeGroup = false;
}
}
因此,如果您的群不为空(例如1条消息),则无法正常发布。但它已过期,然后刚刚发布的消息从组中删除。下一次reaper工作时会进入下一个状态:
/*
* By default empty groups are removed on the same schedule as non-empty
* groups. A longer timeout for empty groups can be enabled by
* setting minimumTimeoutForEmptyGroups.
*/
removeGroup =
lastModifiedNow <= (System.currentTimeMillis() - this.minimumTimeoutForEmptyGroups);
现在删除了空组。
请在调试模式下修改您的 reaperTimeout
确实足以确定您的组已经足够老,可以在第一个和下一个收割者周期中到期。
更新 2
这是一个简单的工作示例:
@SpringBootApplication
@EnableScheduling
public class So71617406Application {
public static void main(String[] args) {
SpringApplication.run(So71617406Application.class, args);
}
@Bean
public MessageGroupStore jdbcMessageStoreSession(DataSource dataSource) {
JdbcMessageStore jdbcMessageStore = new JdbcMessageStore(dataSource);
jdbcMessageStore.setRegion("session");
return jdbcMessageStore;
}
@Bean
public MessageGroupStoreReaper messageGroupStoreSessionReaper(MessageGroupStore jdbcMessageStoreSession) {
MessageGroupStoreReaper messageGroupStoreReaper = new MessageGroupStoreReaper();
messageGroupStoreReaper.setTimeout(3000);
messageGroupStoreReaper.setMessageGroupStore(jdbcMessageStoreSession);
return messageGroupStoreReaper;
}
@Autowired
@Lazy
private MessageGroupStoreReaper messageGroupStoreSessionReaper;
@Scheduled(fixedRate = 2000, initialDelay = 1000)
@Transactional
public void poll() {
this.messageGroupStoreSessionReaper.run();
}
@Bean
public IntegrationFlow aggregatorSession(MessageGroupStore jdbcMessageStoreSession) {
return IntegrationFlows.from("aggregatorSessionChannel")
.aggregate(g -> g
.transactional(true)
.sendPartialResultOnExpiry(true)
.expireGroupsUponCompletion(false)
.expireGroupsUponTimeout(false)
.messageStore(jdbcMessageStoreSession)
.correlationExpression("1")
.releaseStrategy(r -> r.size() == 2))
.nullChannel();
}
@Bean
ApplicationRunner applicationRunner(MessageChannel aggregatorSessionChannel) {
return args -> {
aggregatorSessionChannel.send(new GenericMessage<>("test1"));
aggregatorSessionChannel.send(new GenericMessage<>("test2"));
};
}
}
它的日志看起来像:
2022-04-04 11:10:40.149 DEBUG 2760 --- [ main] o.s.i.a.AggregatingMessageHandler : bean 'aggregatorSession.aggregator#0' for component 'aggregatorSession.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'org.springframework.integration.Whosebug.so71617406.So71617406Application'; from source: 'bean method aggregatorSession' received message: GenericMessage [payload=test1, headers={id=a98e8c8b-be9e-ba5d-d996-e6bb6513afe4, timestamp=1649085040146}]
2022-04-04 11:10:40.150 DEBUG 2760 --- [ main] o.s.i.a.AggregatingMessageHandler : Handling message with correlationKey [1]: GenericMessage [payload=test1, headers={id=a98e8c8b-be9e-ba5d-d996-e6bb6513afe4, timestamp=1649085040146}]
2022-04-04 11:10:40.166 DEBUG 2760 --- [ main] o.s.i.jdbc.store.JdbcMessageStore : Inserting message with id key=a98e8c8b-be9e-ba5d-d996-e6bb6513afe4
2022-04-04 11:10:40.174 DEBUG 2760 --- [ main] o.s.i.jdbc.store.JdbcMessageStore : Inserting message with id key=a98e8c8b-be9e-ba5d-d996-e6bb6513afe4 and created date=2022-04-04 11:10:40.164
2022-04-04 11:10:40.175 DEBUG 2760 --- [ main] o.s.i.jdbc.store.JdbcMessageStore : Creating message group with id key=c4ca4238-a0b9-3382-8dcc-509a6f75849b and created date=2022-04-04 11:10:40.164
2022-04-04 11:10:40.176 DEBUG 2760 --- [ main] o.s.i.store.PersistentMessageGroup : Lazy loading of group size for messageGroup: 1
2022-04-04 11:10:40.183 DEBUG 2760 --- [ main] o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'aggregatorSessionChannel'; defined in: 'org.springframework.integration.Whosebug.so71617406.So71617406Application'; from source: 'bean method aggregatorSession'', message: GenericMessage [payload=test1, headers={id=a98e8c8b-be9e-ba5d-d996-e6bb6513afe4, timestamp=1649085040146}]
2022-04-04 11:10:40.183 DEBUG 2760 --- [ main] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'aggregatorSessionChannel'; defined in: 'org.springframework.integration.Whosebug.so71617406.So71617406Application'; from source: 'bean method aggregatorSession'', message: GenericMessage [payload=test2, headers={id=64f3d9be-30c7-1ff3-eb36-3d821f0c76fd, timestamp=1649085040183}]
2022-04-04 11:10:40.183 DEBUG 2760 --- [ main] o.s.i.a.AggregatingMessageHandler : bean 'aggregatorSession.aggregator#0' for component 'aggregatorSession.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'org.springframework.integration.Whosebug.so71617406.So71617406Application'; from source: 'bean method aggregatorSession' received message: GenericMessage [payload=test2, headers={id=64f3d9be-30c7-1ff3-eb36-3d821f0c76fd, timestamp=1649085040183}]
2022-04-04 11:10:40.183 DEBUG 2760 --- [ main] o.s.i.a.AggregatingMessageHandler : Handling message with correlationKey [1]: GenericMessage [payload=test2, headers={id=64f3d9be-30c7-1ff3-eb36-3d821f0c76fd, timestamp=1649085040183}]
2022-04-04 11:10:40.184 DEBUG 2760 --- [ main] o.s.i.jdbc.store.JdbcMessageStore : Inserting message with id key=64f3d9be-30c7-1ff3-eb36-3d821f0c76fd
2022-04-04 11:10:40.185 DEBUG 2760 --- [ main] o.s.i.jdbc.store.JdbcMessageStore : Inserting message with id key=64f3d9be-30c7-1ff3-eb36-3d821f0c76fd and created date=2022-04-04 11:10:40.164
2022-04-04 11:10:40.185 DEBUG 2760 --- [ main] o.s.i.jdbc.store.JdbcMessageStore : Updating MessageGroup: c4ca4238-a0b9-3382-8dcc-509a6f75849b
2022-04-04 11:10:40.186 DEBUG 2760 --- [ main] o.s.i.store.PersistentMessageGroup : Lazy loading of group size for messageGroup: 1
2022-04-04 11:10:40.187 DEBUG 2760 --- [ main] o.s.i.a.AggregatingMessageHandler : Completing group with correlationKey [1]
2022-04-04 11:10:40.187 DEBUG 2760 --- [ main] o.s.i.store.PersistentMessageGroup : Lazy loading of messages for messageGroup: 1
2022-04-04 11:10:40.191 DEBUG 2760 --- [ main] o.s.integration.channel.NullChannel : message sent to null channel: GenericMessage [payload=[test1, test2], headers={id=5442d9a6-3549-041c-7f49-557b3df62749, timestamp=1649085040191}]
2022-04-04 11:10:40.191 DEBUG 2760 --- [ main] o.s.i.jdbc.store.JdbcMessageStore : Completing MessageGroup: c4ca4238-a0b9-3382-8dcc-509a6f75849b
2022-04-04 11:10:40.191 DEBUG 2760 --- [ main] o.s.i.jdbc.store.JdbcMessageStore : Removing messages from group with group key=c4ca4238-a0b9-3382-8dcc-509a6f75849b
2022-04-04 11:10:40.193 DEBUG 2760 --- [ main] o.s.i.jdbc.store.JdbcMessageStore : Updating MessageGroup: c4ca4238-a0b9-3382-8dcc-509a6f75849b
2022-04-04 11:10:40.193 DEBUG 2760 --- [ main] o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'aggregatorSessionChannel'; defined in: 'org.springframework.integration.Whosebug.so71617406.So71617406Application'; from source: 'bean method aggregatorSession'', message: GenericMessage [payload=test2, headers={id=64f3d9be-30c7-1ff3-eb36-3d821f0c76fd, timestamp=1649085040183}]
2022-04-04 11:10:41.159 DEBUG 2760 --- [ scheduling-1] o.s.i.store.MessageGroupStoreReaper : Expiring all messages older than timeout=3000 from message group store: org.springframework.integration.jdbc.store.JdbcMessageStore@31464a43
2022-04-04 11:10:43.156 DEBUG 2760 --- [ scheduling-1] o.s.i.store.MessageGroupStoreReaper : Expiring all messages older than timeout=3000 from message group store: org.springframework.integration.jdbc.store.JdbcMessageStore@31464a43
2022-04-04 11:10:45.147 DEBUG 2760 --- [ scheduling-1] o.s.i.store.MessageGroupStoreReaper : Expiring all messages older than timeout=3000 from message group store: org.springframework.integration.jdbc.store.JdbcMessageStore@31464a43
2022-04-04 11:10:45.148 DEBUG 2760 --- [ scheduling-1] o.s.i.store.PersistentMessageGroup : Lazy loading of group size for messageGroup: c4ca4238-a0b9-3382-8dcc-509a6f75849b
2022-04-04 11:10:45.149 DEBUG 2760 --- [ scheduling-1] o.s.i.a.AggregatingMessageHandler : Removing empty group: c4ca4238-a0b9-3382-8dcc-509a6f75849b
2022-04-04 11:10:45.149 DEBUG 2760 --- [ scheduling-1] o.s.i.jdbc.store.JdbcMessageStore : Removing relationships for the group with group key=c4ca4238-a0b9-3382-8dcc-509a6f75849b
2022-04-04 11:10:45.150 DEBUG 2760 --- [ scheduling-1] o.s.i.jdbc.store.JdbcMessageStore : Deleting messages with group key=c4ca4238-a0b9-3382-8dcc-509a6f75849b
关注Removing empty group:
。因此,在该组完成后,由于某种原因,您的组似乎不是空的。也许您的 outputSessionProcessor
犯了一些错误,因此它会抛出错误,因此由于事务回滚,完成的消息不会从数据库中删除...