Spring 集成同时使用 PollableChannel

Spring integration use PollableChannel simultaneously

在我的项目中,我有以下 ListAccountWithFiles 个对象。

@AllArgsConstructor
  @Getter
  class AccountWithFiles {

    private String account;
    private List<S3FileInfo> s3FileInfoList;
  }

我想在新线程中分别处理每个 AccountWithFiles。然后使用 split() 拆分 s3FileInfoList 并以 20 分钟的延迟一个接一个地处理它们,但是每个 accounts3FileInfoList.

并行处理

所以我有以下 DSL 定义:

  @Bean
  public IntegrationFlow s3DownloadFlowEnhanced() {
    return IntegrationFlows.fromSupplier(s3FileInfoRepository::findAllGroupByAccount,
        c -> c.poller(Pollers.cron(time, TimeZone.getTimeZone(zone))).id("s3TEMPO"))
        .channel("manualS3EnhancedFlow")
        .split()
        .channel("myChannel")
        .get();
  }

s3FileInfoRepository::findAllGroupByAccount returns AccountWithFiles 对象的列表,然后我将它们拆分并发送到 MessageChannels Executors 频道(具有定义的编号线程数)

  @Bean
  public MessageChannel myChannel() {
    return MessageChannels.publishSubscribe(Executors.newFixedThreadPool(10)).get();
  }

之后

 @Bean
  public IntegrationFlow processEachAccountSeparately() {
    return IntegrationFlows.from("myChannel")
        .<AccountWithFiles, Object>transform(m -> m.getS3FileInfoList().stream().sorted(
              Comparator.comparing(i -> i.getOperationType() == FILE_OPERATION_TYPE.ADD))
              .collect(Collectors.toList()))
        .log()
        //.resequence()
        .split()
        .channel("bookItemsChannel")
        .get();
  }
  
  @Bean
  public PollableChannel bookItemsChannel(){
    return new QueueChannel();
  }
  
  @Bean
  public IntegrationFlow test() {
    return IntegrationFlows.from("bookItemsChannel")
        .delay("delayer.messageGroupId", d -> d
            .defaultDelay(25000L)
            .delayExpression("headers['delay']"))
        .log()
        .get();
  }


  @Bean(name = PollerMetadata.DEFAULT_POLLER)
  public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(5);
    taskExecutor.initialize();

    pollerMetadata.setTaskExecutor(taskExecutor);
    pollerMetadata.setTrigger(new PeriodicTrigger(15000L));
    pollerMetadata.setMaxMessagesPerPoll(3);
    return pollerMetadata;
  }

当 Pollable 通道接收到消息时,它们会被一条一条地延迟处理。我希望我的消息基于来自 s3DownloadFlowEnhancedflow 的拆分器并行处理。 我知道可轮询通道在不同线程中区分消息的发送者和接收者。也许这里有任何解决方法?

processEachAccountSeparately 流中,我看到每个帐户都有自己的线程。

    2021-06-24 15:33:34.585  INFO 56174 --- [pool-4-thread-1] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=[S3FileInfo(fileName=sdfsdf, timeStamp=null, serviceName=null, accountLogin=login2, operationType=ADD)], headers={sequenceNumber=1, correlationId=36b132ac-7c5b-af66-96c0-2334a757c960, id=3bafdaa7-ed4c-087f-5f2c-cc114eae42cd, sequenceSize=2, timestamp=1624538014577}]
    2021-06-24 15:33:34.585  INFO 56174 --- [pool-4-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=[S3FileInfo(fileName=sdfjsjfj, timeStamp=null, serviceName=null, accountLogin=login1, operationType=DELETE), S3FileInfo(fileName=s3/outgoing/file2, timeStamp=null, serviceName=IPASS, accountLogin=login1, operationType=DELETE), S3FileInfo(fileName=outgoing/s3/ipass.xlsx, timeStamp=null, serviceName=IPASS, accountLogin=login1, operationType=ADD), S3FileInfo(fileName=dsfsdf, timeStamp=null, serviceName=null, accountLogin=login1, operationType=ADD)], headers={sequenceNumber=2, correlationId=36b132ac-7c5b-af66-96c0-2334a757c960, id=d8506721-2cfd-b6da-d353-4fb8bd5744fb, sequenceSize=2, timestamp=1624538014577}]

However, PollableChannel executes it one by one

2021-06-24 15:33:46.328  INFO 56174 --- [lTaskExecutor-3] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=S3FileInfo(fileName=sdfjsjfj, timeStamp=null, serviceName=null, accountLogin=login1, operationType=DELETE), headers={sequenceNumber=1, sequenceDetails=[[36b132ac-7c5b-af66-96c0-2334a757c960, 2, 2]], correlationId=d8506721-2cfd-b6da-d353-4fb8bd5744fb, id=7f8bd9a6-25ce-0bb2-c3f3-581d823d8fce, sequenceSize=4, timestamp=1624538014585}]
2021-06-24 15:33:46.329  INFO 56174 --- [lTaskExecutor-3] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=S3FileInfo(fileName=sdfsdf, timeStamp=null, serviceName=null, accountLogin=login2, operationType=ADD), headers={sequenceNumber=1, sequenceDetails=[[36b132ac-7c5b-af66-96c0-2334a757c960, 1, 2]], correlationId=3bafdaa7-ed4c-087f-5f2c-cc114eae42cd, id=f697b52b-1053-51aa-232f-88bb602dc1c9, sequenceSize=1, timestamp=1624538014585}]
2021-06-24 15:33:46.329  INFO 56174 --- [lTaskExecutor-3] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=S3FileInfo(fileName=s3/outgoing/file2, timeStamp=null, serviceName=IPASS, accountLogin=login1, operationType=DELETE), headers={sequenceNumber=2, sequenceDetails=[[36b132ac-7c5b-af66-96c0-2334a757c960, 2, 2]], correlationId=d8506721-2cfd-b6da-d353-4fb8bd5744fb, id=a6754c98-fce0-f132-664a-65d61f553ae2, sequenceSize=4, timestamp=1624538014585}]
2021-06-24 15:34:01.333  INFO 56174 --- [lTaskExecutor-4] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=S3FileInfo(fileName=outgoing/s3/ipass.xlsx, timeStamp=null, serviceName=IPASS, accountLogin=login1, operationType=ADD), headers={sequenceNumber=3, sequenceDetails=[[36b132ac-7c5b-af66-96c0-2334a757c960, 2, 2]], correlationId=d8506721-2cfd-b6da-d353-4fb8bd5744fb, id=71fa915a-fcaa-3d00-023b-5cf51be3b183, sequenceSize=4, timestamp=1624538014585}]
2021-06-24 15:34:01.333  INFO 56174 --- [lTaskExecutor-4] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=S3FileInfo(fileName=dsfsdf, timeStamp=null, serviceName=null, accountLogin=login1, operationType=ADD), headers={sequenceNumber=4, sequenceDetails=[[36b132ac-7c5b-af66-96c0-2334a757c960, 2, 2]], correlationId=d8506721-2cfd-b6da-d353-4fb8bd5744fb, id=7c513e23-5484-4f61-b7d3-362648c7b89c, sequenceSize=4, timestamp=1624538014585}]

我想要的是这样的:

[pool-4-thread-1] simultaneously 
[pool-4-thread-2] simultaneously
[pool-4-thread-2] 20 min delay
[pool-4-thread-2] 20 min delay
[pool-4-thread-2] 20 min delay

您关于拆分 AccountWithFiles 列表的第一步是正确的。然后你说你想把s3FileInfoList拆分出来,一个一个的依次处理,为什么要放到一个QeueuChannel里呢?在这种情况下,常规的默认 DirectChannel 就足够了。

然而,然后您转到 delay(),它不会阻塞当前线程,而是在将来在单独的线程中安排任务。因此,您可能需要重新考虑您的解决方案,因为使用当前方法,即使您将 bookItemsChannel 作为队列删除,您仍然不会按顺序处理延迟的消息。当它们都具有相同的计划时间时,无法保证 TaskScheduler 哪个将首先执行。