Spring 集成 - 计划删除消息组

Spring integration - scheduled remove message groups

我使用 Spring 集成聚合器模式并存储在 JdbcMessageStore

我知道 .expireGroupsUponCompletion(true).expireGroupsUponTimeout(true) 从商店中删除了一个组。 我想知道这两个属性是否都是 false,如何在预定时间(例如一天结束时)另外删除组。可能吗?

更新

聚合器

@Bean
public IntegrationFlow aggregatorSession(MessageGroupStore jdbcMessageStoreSession) {
    return IntegrationFlows.from(aggregatorSessionChannel())
            .aggregate(g -> g.poller(Pollers.fixedDelay(1000).errorChannel("error.input"))
                    .transactional(true)
                    .discardChannel("discardSession.input")
                    .sendPartialResultOnExpiry(true)
                    .expireGroupsUponCompletion(false)
                    .expireGroupsUponTimeout(false)
                    .messageStore(jdbcMessageStoreSession)
                    .correlationExpression("payload.item.ticket")
                    .releaseStrategy(r -> r.size() == 2)
                    .outputProcessor(outputSessionProcessor()))
            ...
}

配置

@Bean
public MessageGroupStore jdbcMessageStoreSession(DataSource dataSource) {
    JdbcMessageStore jdbcMessageStore = new JdbcMessageStore(dataSource);
    jdbcMessageStore.setRegion("session");
    return jdbcMessageStore;
}

@Bean
public MessageGroupStoreReaper messageGroupStoreSessionReaper(MessageGroupStore jdbcMessageStoreSession) {
    MessageGroupStoreReaper messageGroupStoreReaper = new MessageGroupStoreReaper();
    messageGroupStoreReaper.setTimeout(3000);
    messageGroupStoreReaper.setMessageGroupStore(jdbcMessageStoreSession);
    return messageGroupStoreReaper;
}

计划任务

@Service
@RequiredArgsConstructor
public class SchedulerReaperService {

    private final MessageGroupStoreReaper messageGroupStoreSessionReaper;

    @Scheduled(fixedRate = 2000, initialDelay = 1000)
    @Transactional
    public void poll() {
        messageGroupStoreSessionReaper.run();
    }
}

调试日志

2022-04-04 12:07:38.377 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler        : bean 'aggregatorSession.aggregator#0' for component 'aggregatorSession.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [sk/vub/quatro/integration/AmqpIntegration.class]'; from source: 'bean method aggregatorSession' received message: GenericMessage [payload=AggregatorWrap(item=DocumentDTO(ticket=d0643e59-4f37-4df6-8548-1be388cc895d, state=OPEN, signingSessionId=null, documentId=null, type=null, businessKey=85a6a6fd-f1fb-4599-9b0c-ea007f6f9f29, partyId=null), order=1), headers={nativeHeaders={}, id=b3114fd5-cb20-bf6f-0d02-1418660bbd41}]
2022-04-04 12:07:38.378 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler        : Handling message with correlationKey [d0643e59-4f37-4df6-8548-1be388cc895d]: GenericMessage [payload=AggregatorWrap(item=DocumentDTO(ticket=d0643e59-4f37-4df6-8548-1be388cc895d, state=OPEN, signingSessionId=null, documentId=null, type=null, businessKey=85a6a6fd-f1fb-4599-9b0c-ea007f6f9f29, partyId=null), order=1), headers={nativeHeaders={}, id=b3114fd5-cb20-bf6f-0d02-1418660bbd41}]
2022-04-04 12:07:38.382 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler        : bean 'aggregatorSession.aggregator#0' for component 'aggregatorSession.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [sk/vub/quatro/integration/AmqpIntegration.class]'; from source: 'bean method aggregatorSession' received message: GenericMessage [payload=AggregatorWrap(item=DocumentDTO(ticket=d0643e59-4f37-4df6-8548-1be388cc895d, state=OPEN, signingSessionId=e361ecee-3211-4f1d-b2de-fe002effa3fb, documentId=1, type=CONTRACT, businessKey=85a6a6fd-f1fb-4599-9b0c-ea007f6f9f29, partyId=284680~^~CIS), order=0), headers={nativeHeaders={}, documentType=CONTRACT, amqp_replyTo=event-exchange/event.response, amqp_correlationId=7ed32e1a-9887-4e0c-b9b0-0a12da6452f9, id=8521c14a-7a19-5b2c-3942-5e85bae0bd0e}]
2022-04-04 12:07:38.383 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler        : Handling message with correlationKey [d0643e59-4f37-4df6-8548-1be388cc895d]: GenericMessage [payload=AggregatorWrap(item=DocumentDTO(ticket=d0643e59-4f37-4df6-8548-1be388cc895d, state=OPEN, signingSessionId=e361ecee-3211-4f1d-b2de-fe002effa3fb, documentId=1, type=CONTRACT, businessKey=85a6a6fd-f1fb-4599-9b0c-ea007f6f9f29, partyId=284680~^~CIS), order=0), headers={nativeHeaders={}, documentType=CONTRACT, amqp_replyTo=event-exchange/event.response, amqp_correlationId=7ed32e1a-9887-4e0c-b9b0-0a12da6452f9, id=8521c14a-7a19-5b2c-3942-5e85bae0bd0e}]
2022-04-04 12:07:38.385 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler        : Completing group with correlationKey [d0643e59-4f37-4df6-8548-1be388cc895d]

2022-04-04 12:07:41.455 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler        : Group expiry candidate (d0643e59-4f37-4df6-8548-1be388cc895d) has changed - it may be reconsidered for a future expiration
2022-04-04 12:07:43.477 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler        : Group expiry candidate (d0643e59-4f37-4df6-8548-1be388cc895d) has changed - it may be reconsidered for a future expiration
2022-04-04 12:07:45.497 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler        : Group expiry candidate (d0643e59-4f37-4df6-8548-1be388cc895d) has changed - it may be reconsidered for a future expiration
2022-04-04 12:07:47.517 DEBUG [scheduling-1] o.s.i.a.AggregatingMessageHandler        : Group expiry candidate (d0643e59-4f37-4df6-8548-1be388cc895d) has changed - it may be reconsidered for a future expiration

请参阅 MessageGroupStoreReaper 以及如何为计划任务配置它,例如通过 cron - 在一天结束时:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator.

XML中有样例,但Java配置相差不大:

<bean id="reaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="messageStore"/>
    <property name="timeout" value="30000"/>
</bean>

<task:scheduled-tasks scheduler="scheduler">
    <task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>

参见 @EnableScheduling@Scheduled Java。

更新

聚合器那边的逻辑是这样的:

                if (groupSize > 0) {
                    noOutput = false;
                    if (this.releaseStrategy.canRelease(groupNow)) {
                        completeGroup(correlationKey, groupNow, lock);
                    }
                    else {
                        expireGroup(correlationKey, groupNow, lock);
                    }
                    if (!this.expireGroupsUponTimeout) {
                        afterRelease(groupNow, groupNow.getMessages(), true);
                        removeGroup = false;
                    }
                }

因此,如果您的群不为空(例如1条消息),则无法正常发布。但它已过期,然后刚刚发布的消息从组中删除。下一次reaper工作时会进入下一个状态:

                    /*
                     * By default empty groups are removed on the same schedule as non-empty
                     * groups. A longer timeout for empty groups can be enabled by
                     * setting minimumTimeoutForEmptyGroups.
                     */
                    removeGroup =
                            lastModifiedNow <= (System.currentTimeMillis() - this.minimumTimeoutForEmptyGroups);

现在删除了空组。

请在调试模式下修改您的 reaperTimeout 确实足以确定您的组已经足够老,可以在第一个和下一个收割者周期中到期。

更新 2

这是一个简单的工作示例:

@SpringBootApplication
@EnableScheduling
public class So71617406Application {

    public static void main(String[] args) {
        SpringApplication.run(So71617406Application.class, args);
    }

    @Bean
    public MessageGroupStore jdbcMessageStoreSession(DataSource dataSource) {
        JdbcMessageStore jdbcMessageStore = new JdbcMessageStore(dataSource);
        jdbcMessageStore.setRegion("session");
        return jdbcMessageStore;
    }

    @Bean
    public MessageGroupStoreReaper messageGroupStoreSessionReaper(MessageGroupStore jdbcMessageStoreSession) {
        MessageGroupStoreReaper messageGroupStoreReaper = new MessageGroupStoreReaper();
        messageGroupStoreReaper.setTimeout(3000);
        messageGroupStoreReaper.setMessageGroupStore(jdbcMessageStoreSession);
        return messageGroupStoreReaper;
    }

    @Autowired
    @Lazy
    private MessageGroupStoreReaper messageGroupStoreSessionReaper;

    @Scheduled(fixedRate = 2000, initialDelay = 1000)
    @Transactional
    public void poll() {
        this.messageGroupStoreSessionReaper.run();
    }

    @Bean
    public IntegrationFlow aggregatorSession(MessageGroupStore jdbcMessageStoreSession) {
        return IntegrationFlows.from("aggregatorSessionChannel")
                .aggregate(g -> g
                        .transactional(true)
                        .sendPartialResultOnExpiry(true)
                        .expireGroupsUponCompletion(false)
                        .expireGroupsUponTimeout(false)
                        .messageStore(jdbcMessageStoreSession)
                        .correlationExpression("1")
                        .releaseStrategy(r -> r.size() == 2))
                .nullChannel();
    }

    @Bean
    ApplicationRunner applicationRunner(MessageChannel aggregatorSessionChannel) {
        return args -> {
            aggregatorSessionChannel.send(new GenericMessage<>("test1"));
            aggregatorSessionChannel.send(new GenericMessage<>("test2"));
        };
    }

}

它的日志看起来像:

2022-04-04 11:10:40.149 DEBUG 2760 --- [           main] o.s.i.a.AggregatingMessageHandler        : bean 'aggregatorSession.aggregator#0' for component 'aggregatorSession.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'org.springframework.integration.Whosebug.so71617406.So71617406Application'; from source: 'bean method aggregatorSession' received message: GenericMessage [payload=test1, headers={id=a98e8c8b-be9e-ba5d-d996-e6bb6513afe4, timestamp=1649085040146}]
2022-04-04 11:10:40.150 DEBUG 2760 --- [           main] o.s.i.a.AggregatingMessageHandler        : Handling message with correlationKey [1]: GenericMessage [payload=test1, headers={id=a98e8c8b-be9e-ba5d-d996-e6bb6513afe4, timestamp=1649085040146}]
2022-04-04 11:10:40.166 DEBUG 2760 --- [           main] o.s.i.jdbc.store.JdbcMessageStore        : Inserting message with id key=a98e8c8b-be9e-ba5d-d996-e6bb6513afe4
2022-04-04 11:10:40.174 DEBUG 2760 --- [           main] o.s.i.jdbc.store.JdbcMessageStore        : Inserting message with id key=a98e8c8b-be9e-ba5d-d996-e6bb6513afe4 and created date=2022-04-04 11:10:40.164
2022-04-04 11:10:40.175 DEBUG 2760 --- [           main] o.s.i.jdbc.store.JdbcMessageStore        : Creating message group with id key=c4ca4238-a0b9-3382-8dcc-509a6f75849b and created date=2022-04-04 11:10:40.164
2022-04-04 11:10:40.176 DEBUG 2760 --- [           main] o.s.i.store.PersistentMessageGroup       : Lazy loading of group size for messageGroup: 1
2022-04-04 11:10:40.183 DEBUG 2760 --- [           main] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'bean 'aggregatorSessionChannel'; defined in: 'org.springframework.integration.Whosebug.so71617406.So71617406Application'; from source: 'bean method aggregatorSession'', message: GenericMessage [payload=test1, headers={id=a98e8c8b-be9e-ba5d-d996-e6bb6513afe4, timestamp=1649085040146}]
2022-04-04 11:10:40.183 DEBUG 2760 --- [           main] o.s.integration.channel.DirectChannel    : preSend on channel 'bean 'aggregatorSessionChannel'; defined in: 'org.springframework.integration.Whosebug.so71617406.So71617406Application'; from source: 'bean method aggregatorSession'', message: GenericMessage [payload=test2, headers={id=64f3d9be-30c7-1ff3-eb36-3d821f0c76fd, timestamp=1649085040183}]
2022-04-04 11:10:40.183 DEBUG 2760 --- [           main] o.s.i.a.AggregatingMessageHandler        : bean 'aggregatorSession.aggregator#0' for component 'aggregatorSession.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'org.springframework.integration.Whosebug.so71617406.So71617406Application'; from source: 'bean method aggregatorSession' received message: GenericMessage [payload=test2, headers={id=64f3d9be-30c7-1ff3-eb36-3d821f0c76fd, timestamp=1649085040183}]
2022-04-04 11:10:40.183 DEBUG 2760 --- [           main] o.s.i.a.AggregatingMessageHandler        : Handling message with correlationKey [1]: GenericMessage [payload=test2, headers={id=64f3d9be-30c7-1ff3-eb36-3d821f0c76fd, timestamp=1649085040183}]
2022-04-04 11:10:40.184 DEBUG 2760 --- [           main] o.s.i.jdbc.store.JdbcMessageStore        : Inserting message with id key=64f3d9be-30c7-1ff3-eb36-3d821f0c76fd
2022-04-04 11:10:40.185 DEBUG 2760 --- [           main] o.s.i.jdbc.store.JdbcMessageStore        : Inserting message with id key=64f3d9be-30c7-1ff3-eb36-3d821f0c76fd and created date=2022-04-04 11:10:40.164
2022-04-04 11:10:40.185 DEBUG 2760 --- [           main] o.s.i.jdbc.store.JdbcMessageStore        : Updating MessageGroup: c4ca4238-a0b9-3382-8dcc-509a6f75849b
2022-04-04 11:10:40.186 DEBUG 2760 --- [           main] o.s.i.store.PersistentMessageGroup       : Lazy loading of group size for messageGroup: 1
2022-04-04 11:10:40.187 DEBUG 2760 --- [           main] o.s.i.a.AggregatingMessageHandler        : Completing group with correlationKey [1]
2022-04-04 11:10:40.187 DEBUG 2760 --- [           main] o.s.i.store.PersistentMessageGroup       : Lazy loading of messages for messageGroup: 1
2022-04-04 11:10:40.191 DEBUG 2760 --- [           main] o.s.integration.channel.NullChannel      : message sent to null channel: GenericMessage [payload=[test1, test2], headers={id=5442d9a6-3549-041c-7f49-557b3df62749, timestamp=1649085040191}]
2022-04-04 11:10:40.191 DEBUG 2760 --- [           main] o.s.i.jdbc.store.JdbcMessageStore        : Completing MessageGroup: c4ca4238-a0b9-3382-8dcc-509a6f75849b
2022-04-04 11:10:40.191 DEBUG 2760 --- [           main] o.s.i.jdbc.store.JdbcMessageStore        : Removing messages from group with group key=c4ca4238-a0b9-3382-8dcc-509a6f75849b
2022-04-04 11:10:40.193 DEBUG 2760 --- [           main] o.s.i.jdbc.store.JdbcMessageStore        : Updating MessageGroup: c4ca4238-a0b9-3382-8dcc-509a6f75849b
2022-04-04 11:10:40.193 DEBUG 2760 --- [           main] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'bean 'aggregatorSessionChannel'; defined in: 'org.springframework.integration.Whosebug.so71617406.So71617406Application'; from source: 'bean method aggregatorSession'', message: GenericMessage [payload=test2, headers={id=64f3d9be-30c7-1ff3-eb36-3d821f0c76fd, timestamp=1649085040183}]
2022-04-04 11:10:41.159 DEBUG 2760 --- [   scheduling-1] o.s.i.store.MessageGroupStoreReaper      : Expiring all messages older than timeout=3000 from message group store: org.springframework.integration.jdbc.store.JdbcMessageStore@31464a43
2022-04-04 11:10:43.156 DEBUG 2760 --- [   scheduling-1] o.s.i.store.MessageGroupStoreReaper      : Expiring all messages older than timeout=3000 from message group store: org.springframework.integration.jdbc.store.JdbcMessageStore@31464a43
2022-04-04 11:10:45.147 DEBUG 2760 --- [   scheduling-1] o.s.i.store.MessageGroupStoreReaper      : Expiring all messages older than timeout=3000 from message group store: org.springframework.integration.jdbc.store.JdbcMessageStore@31464a43
2022-04-04 11:10:45.148 DEBUG 2760 --- [   scheduling-1] o.s.i.store.PersistentMessageGroup       : Lazy loading of group size for messageGroup: c4ca4238-a0b9-3382-8dcc-509a6f75849b
2022-04-04 11:10:45.149 DEBUG 2760 --- [   scheduling-1] o.s.i.a.AggregatingMessageHandler        : Removing empty group: c4ca4238-a0b9-3382-8dcc-509a6f75849b
2022-04-04 11:10:45.149 DEBUG 2760 --- [   scheduling-1] o.s.i.jdbc.store.JdbcMessageStore        : Removing relationships for the group with group key=c4ca4238-a0b9-3382-8dcc-509a6f75849b
2022-04-04 11:10:45.150 DEBUG 2760 --- [   scheduling-1] o.s.i.jdbc.store.JdbcMessageStore        : Deleting messages with group key=c4ca4238-a0b9-3382-8dcc-509a6f75849b

关注Removing empty group:。因此,在该组完成后,由于某种原因,您的组似乎不是空的。也许您的 outputSessionProcessor 犯了一些错误,因此它会抛出错误,因此由于事务回滚,完成的消息不会从数据库中删除...