运行 多个 Spring 并行集成流程

Run multiple Spring Integration Flows in parallel

我有一个应用程序可以轮询多个目录,然后将作业请求发送到 Sring Batch,每个目录都注册为不同的 Flow。是否可以并行运行这个?我有这个用例,因为每个目录都连接到不同的业务实体,当流程因格式错误的文件或特定实体的 mq 代理不存在而卡住时,其他人需要继续工作。
我使用 IntegrationFlowContext 注册了流程。

@Configuration
@RequiredArgsConstructor
@Slf4j
public class IntegrationConfigSO implements CommandLineRunner {
    
    private final HalFileAdapterConfig config;
    private final JobRepository jobRepository;
    private final BatchJobs batchJobs;
    private final ApplicationIntegrationEventPublisher eventPublisher;
    private final IntegrationFlowContext flowContext;
    
    @Override
    public void run(String... args) throws Exception {
        registerFlows();
    }
    
    public void registerFlows() {
        Arrays.stream(config.getSystemsEnabled())
                .map(this::flow)
                .forEach(flow -> flowContext.registration(flow)
                        .id(UUID.randomUUID().toString())
                        .useFlowIdAsPrefix()
                        .register()
                );
        
    }
    
    public IntegrationFlow flow(String systemId) {
        return IntegrationFlows
                .from(
                        fileReadingMessageSource(systemId),
                        c -> c.poller(Pollers.fixedDelay(config.getPollTimeSeconds(), TimeUnit.SECONDS)
                                .maxMessagesPerPoll(config.getMaxFilesPerPoll())))
                .transform(fileMessageToJobRequest())
                .handle(jobLaunchingGateway())
                .channel("jobReplyChannel")
                .get();
    }
    
    
    public MessageSource<File> fileReadingMessageSource(String systemId) {
        FileReadingMessageSource source = new FileReadingMessageSource(getCustomFileComparator());
        source.setAutoCreateDirectory(true);
        source.setDirectory(new File(config.getBaseDirectory() + File.separatorChar + systemId));
        source.setScanner(directoryScanner());
        return source;
    }
    
    @Bean
    public DirectoryScanner directoryScanner() {
        CustomRecursiveDirScanner scanner = new CustomRecursiveDirScanner(config);
        CompositeFileListFilter<File> filters = new CompositeFileListFilter<>();
        filters.addFilter(new AcceptOnceFileListFilter<>());
        scanner.setFilter(filters);
        return scanner;
    }
    
    @Bean
    public FileMessageToJobRequest fileMessageToJobRequest() {
        FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest(config, eventPublisher);
        fileMessageToJobRequest.setJob(batchJobs.job());
        return fileMessageToJobRequest;
    }
    
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public JobLaunchingGateway jobLaunchingGateway() {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(jobRepository);
        simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
        JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
        jobLaunchingGateway.setOutputChannel(jobReplyChannel());
        return jobLaunchingGateway;
    }
    
    @Bean
    public MessageChannel jobReplyChannel() {
        return new DirectChannel();
    }
    
}

是的。它是有效的、可能的并且有效 use-case。 Spring 集成中的轮询器依赖于 TaskScheduler 及其线程池。因此,要确保所有并行流程都能正常工作,您需要使线程池足够大。

有关详细信息,请参阅文档:https://docs.spring.io/spring-integration/docs/current/reference/html/configuration.html#namespace-taskscheduler

还有一个 spring.integration.taskScheduler.poolSize 全球整合 属性。 (该文档的下一部分)。

如果使用Spring引导,请参阅TaskScheduler auto-configuration:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#features.task-execution-and-scheduling