运行 多个 Spring 并行集成流程
Run multiple Spring Integration Flows in parallel
我有一个应用程序可以轮询多个目录,然后将作业请求发送到 Sring Batch,每个目录都注册为不同的 Flow。是否可以并行运行这个?我有这个用例,因为每个目录都连接到不同的业务实体,当流程因格式错误的文件或特定实体的 mq 代理不存在而卡住时,其他人需要继续工作。
我使用 IntegrationFlowContext 注册了流程。
@Configuration
@RequiredArgsConstructor
@Slf4j
public class IntegrationConfigSO implements CommandLineRunner {
private final HalFileAdapterConfig config;
private final JobRepository jobRepository;
private final BatchJobs batchJobs;
private final ApplicationIntegrationEventPublisher eventPublisher;
private final IntegrationFlowContext flowContext;
@Override
public void run(String... args) throws Exception {
registerFlows();
}
public void registerFlows() {
Arrays.stream(config.getSystemsEnabled())
.map(this::flow)
.forEach(flow -> flowContext.registration(flow)
.id(UUID.randomUUID().toString())
.useFlowIdAsPrefix()
.register()
);
}
public IntegrationFlow flow(String systemId) {
return IntegrationFlows
.from(
fileReadingMessageSource(systemId),
c -> c.poller(Pollers.fixedDelay(config.getPollTimeSeconds(), TimeUnit.SECONDS)
.maxMessagesPerPoll(config.getMaxFilesPerPoll())))
.transform(fileMessageToJobRequest())
.handle(jobLaunchingGateway())
.channel("jobReplyChannel")
.get();
}
public MessageSource<File> fileReadingMessageSource(String systemId) {
FileReadingMessageSource source = new FileReadingMessageSource(getCustomFileComparator());
source.setAutoCreateDirectory(true);
source.setDirectory(new File(config.getBaseDirectory() + File.separatorChar + systemId));
source.setScanner(directoryScanner());
return source;
}
@Bean
public DirectoryScanner directoryScanner() {
CustomRecursiveDirScanner scanner = new CustomRecursiveDirScanner(config);
CompositeFileListFilter<File> filters = new CompositeFileListFilter<>();
filters.addFilter(new AcceptOnceFileListFilter<>());
scanner.setFilter(filters);
return scanner;
}
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest(config, eventPublisher);
fileMessageToJobRequest.setJob(batchJobs.job());
return fileMessageToJobRequest;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public JobLaunchingGateway jobLaunchingGateway() {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository);
simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
jobLaunchingGateway.setOutputChannel(jobReplyChannel());
return jobLaunchingGateway;
}
@Bean
public MessageChannel jobReplyChannel() {
return new DirectChannel();
}
}
是的。它是有效的、可能的并且有效 use-case。 Spring 集成中的轮询器依赖于 TaskScheduler
及其线程池。因此,要确保所有并行流程都能正常工作,您需要使线程池足够大。
有关详细信息,请参阅文档:https://docs.spring.io/spring-integration/docs/current/reference/html/configuration.html#namespace-taskscheduler
还有一个 spring.integration.taskScheduler.poolSize
全球整合 属性。 (该文档的下一部分)。
如果使用Spring引导,请参阅TaskScheduler
auto-configuration:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#features.task-execution-and-scheduling
我有一个应用程序可以轮询多个目录,然后将作业请求发送到 Sring Batch,每个目录都注册为不同的 Flow。是否可以并行运行这个?我有这个用例,因为每个目录都连接到不同的业务实体,当流程因格式错误的文件或特定实体的 mq 代理不存在而卡住时,其他人需要继续工作。
我使用 IntegrationFlowContext 注册了流程。
@Configuration
@RequiredArgsConstructor
@Slf4j
public class IntegrationConfigSO implements CommandLineRunner {
private final HalFileAdapterConfig config;
private final JobRepository jobRepository;
private final BatchJobs batchJobs;
private final ApplicationIntegrationEventPublisher eventPublisher;
private final IntegrationFlowContext flowContext;
@Override
public void run(String... args) throws Exception {
registerFlows();
}
public void registerFlows() {
Arrays.stream(config.getSystemsEnabled())
.map(this::flow)
.forEach(flow -> flowContext.registration(flow)
.id(UUID.randomUUID().toString())
.useFlowIdAsPrefix()
.register()
);
}
public IntegrationFlow flow(String systemId) {
return IntegrationFlows
.from(
fileReadingMessageSource(systemId),
c -> c.poller(Pollers.fixedDelay(config.getPollTimeSeconds(), TimeUnit.SECONDS)
.maxMessagesPerPoll(config.getMaxFilesPerPoll())))
.transform(fileMessageToJobRequest())
.handle(jobLaunchingGateway())
.channel("jobReplyChannel")
.get();
}
public MessageSource<File> fileReadingMessageSource(String systemId) {
FileReadingMessageSource source = new FileReadingMessageSource(getCustomFileComparator());
source.setAutoCreateDirectory(true);
source.setDirectory(new File(config.getBaseDirectory() + File.separatorChar + systemId));
source.setScanner(directoryScanner());
return source;
}
@Bean
public DirectoryScanner directoryScanner() {
CustomRecursiveDirScanner scanner = new CustomRecursiveDirScanner(config);
CompositeFileListFilter<File> filters = new CompositeFileListFilter<>();
filters.addFilter(new AcceptOnceFileListFilter<>());
scanner.setFilter(filters);
return scanner;
}
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest(config, eventPublisher);
fileMessageToJobRequest.setJob(batchJobs.job());
return fileMessageToJobRequest;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public JobLaunchingGateway jobLaunchingGateway() {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository);
simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
jobLaunchingGateway.setOutputChannel(jobReplyChannel());
return jobLaunchingGateway;
}
@Bean
public MessageChannel jobReplyChannel() {
return new DirectChannel();
}
}
是的。它是有效的、可能的并且有效 use-case。 Spring 集成中的轮询器依赖于 TaskScheduler
及其线程池。因此,要确保所有并行流程都能正常工作,您需要使线程池足够大。
有关详细信息,请参阅文档:https://docs.spring.io/spring-integration/docs/current/reference/html/configuration.html#namespace-taskscheduler
还有一个 spring.integration.taskScheduler.poolSize
全球整合 属性。 (该文档的下一部分)。
如果使用Spring引导,请参阅TaskScheduler
auto-configuration:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#features.task-execution-and-scheduling