如何在 spring 批处理中制作非阻塞项目处理器(不仅与 TaskExecuter 异步)?
How to make a non-blocking item processor in spring batch (Not only asynchronous with a TaskExecuter)?
Spring 批次有一个名为 AsyncItemProcessor
的工具。它只是用 ItemProcessor
包装一个 运行 并用 TaskExecutor
包装它,因此它可以 运行 异步。我想在此 ItemProcessor
中进行休息调用,问题是此 TaskExecutor
中进行休息调用的每个线程都将被阻塞,直到获得响应。我想让它成为非阻塞的,类似于反应式范例。
我有一个 ItemProcessor 调用休息点并得到它的响应:
@Bean
public ItemProcessor<String, String> testItemProcessor() {
return item -> {
String url = "http://localhost:8787/test";
try {
// it's a long time process and take a lot of time
String response = restTemplate.exchange(new URI(url), HttpMethod.GET, new RequestEntity(HttpMethod.GET, new URI(url)), String.class).getBody();
return response;
} catch (URISyntaxException e) {
e.printStackTrace();
return null;
}
};
}
现在我用AsyncItemProcessor
包裹它:
@Bean
public AsyncItemProcessor testAsyncItemProcessor() throws Exception {
AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(testItemProcessor());
asyncItemProcessor.setTaskExecutor(testThreadPoolTaskExecutor());
asyncItemProcessor.afterPropertiesSet();
return asyncItemProcessor;
}
@Bean
public ThreadPoolTaskExecutor testThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(50);
threadPoolTaskExecutor.setMaxPoolSize(100);
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
return threadPoolTaskExecutor;
}
我用了一个ThreadPoolTaskExecutor
作为TaskExecuter
。
这是ItemWriter
:
@Bean
public ItemWriter<String> testItemWriter() {
return items -> {
// I write them to a file and a database, but for simplicity:
for (String item : items) {
System.out.println(item);
}
};
}
@Bean
public AsyncItemWriter asyncTestItemWriter() throws Exception {
AsyncItemWriter asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(testItemWriter());
asyncItemWriter.afterPropertiesSet();
return asyncItemWriter;
}
步骤和作业配置:
@Bean
public Step testStep() throws Exception {
return stepBuilderFactory.get("testStep")
.<String, String>chunk(1000)
.reader(testItemReader())
.processor(testAsyncItemProcessor())
.writer(asyncTestItemWriter())
.build();
}
@Bean
public Job testJob() throws Exception {
return jobBuilderFactory.get("testJob")
.start(testStep())
.build();
}
ItemReader
是一个简单的 ListItemReader
:
@Bean
public ItemReader<String> testItemReader() {
List<String> integerList = new ArrayList<>();
for (int i=0; i<10000; i++) {
integerList.add(String.valueOf(i));
}
return new ListItemReader(integerList);
}
现在我有一个有 50~100 个线程的 ThreadPoolTaskExecutor
。 ItemProcessor
中的每个线程进行 rest 调用并 waits/blocks 接收来自服务器的响应。有没有办法让这些 calls/process 非阻塞?如果是,我应该如何设计ItemWriter
?在 ItemWriter
中,我想将 ItemProcessor
的结果写入文件和数据库。
每个块的大小为 1000,我可以等到其中的所有记录都得到处理,但我不想在块内的每个 rest 调用中阻塞一个线程。有什么办法可以做到吗?
我知道 Spring rest 模板是使进程阻塞的模板,应该使用 webclient,但是 spring 批处理中是否有任何等效组件(而不是AsyncItemProcessor/AsyncItemWriter) 的 Web 客户端?
不,Spring Batch 中不支持响应式编程,这里有一个开放的功能请求:https://github.com/spring-projects/spring-batch/issues/1008。
请注意,响应式意味着整个堆栈都应该是响应式的,从批处理工件(reader、处理器、编写器、侦听器等)到基础设施 bean(作业存储库、事务管理器等),不仅是您的项目处理器和作家。
此外,当前的块处理模型实际上与反应式范式不兼容。原因是 ChunkOrientedTasklet 基本上使用了两个合作者:
- A
ChunkProvider
提供项目块(将项目读取委托给 ItemReader
)
- A
ChunkProcessor
处理块(将处理和写入分别委托给 ItemProcessor
/ItemWriter
)
这里是代码的简化版本:
Chunk inputs = chunkProvider.provide();
chunkProcessor.process(inputs);
如您所见,该步骤将等待 chunkProcessor(处理器 + 写入器)处理完整个块,然后再读取下一个块。因此,在您的情况下,即使您在处理器 + 编写器中使用非阻塞 API,您的步骤也会在读取下一个块之前等待块被完全处理(除了等待与作业存储库和事务管理器的阻塞交互) .
Spring 批次有一个名为 AsyncItemProcessor
的工具。它只是用 ItemProcessor
包装一个 运行 并用 TaskExecutor
包装它,因此它可以 运行 异步。我想在此 ItemProcessor
中进行休息调用,问题是此 TaskExecutor
中进行休息调用的每个线程都将被阻塞,直到获得响应。我想让它成为非阻塞的,类似于反应式范例。
我有一个 ItemProcessor 调用休息点并得到它的响应:
@Bean
public ItemProcessor<String, String> testItemProcessor() {
return item -> {
String url = "http://localhost:8787/test";
try {
// it's a long time process and take a lot of time
String response = restTemplate.exchange(new URI(url), HttpMethod.GET, new RequestEntity(HttpMethod.GET, new URI(url)), String.class).getBody();
return response;
} catch (URISyntaxException e) {
e.printStackTrace();
return null;
}
};
}
现在我用AsyncItemProcessor
包裹它:
@Bean
public AsyncItemProcessor testAsyncItemProcessor() throws Exception {
AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(testItemProcessor());
asyncItemProcessor.setTaskExecutor(testThreadPoolTaskExecutor());
asyncItemProcessor.afterPropertiesSet();
return asyncItemProcessor;
}
@Bean
public ThreadPoolTaskExecutor testThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(50);
threadPoolTaskExecutor.setMaxPoolSize(100);
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
return threadPoolTaskExecutor;
}
我用了一个ThreadPoolTaskExecutor
作为TaskExecuter
。
这是ItemWriter
:
@Bean
public ItemWriter<String> testItemWriter() {
return items -> {
// I write them to a file and a database, but for simplicity:
for (String item : items) {
System.out.println(item);
}
};
}
@Bean
public AsyncItemWriter asyncTestItemWriter() throws Exception {
AsyncItemWriter asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(testItemWriter());
asyncItemWriter.afterPropertiesSet();
return asyncItemWriter;
}
步骤和作业配置:
@Bean
public Step testStep() throws Exception {
return stepBuilderFactory.get("testStep")
.<String, String>chunk(1000)
.reader(testItemReader())
.processor(testAsyncItemProcessor())
.writer(asyncTestItemWriter())
.build();
}
@Bean
public Job testJob() throws Exception {
return jobBuilderFactory.get("testJob")
.start(testStep())
.build();
}
ItemReader
是一个简单的 ListItemReader
:
@Bean
public ItemReader<String> testItemReader() {
List<String> integerList = new ArrayList<>();
for (int i=0; i<10000; i++) {
integerList.add(String.valueOf(i));
}
return new ListItemReader(integerList);
}
现在我有一个有 50~100 个线程的 ThreadPoolTaskExecutor
。 ItemProcessor
中的每个线程进行 rest 调用并 waits/blocks 接收来自服务器的响应。有没有办法让这些 calls/process 非阻塞?如果是,我应该如何设计ItemWriter
?在 ItemWriter
中,我想将 ItemProcessor
的结果写入文件和数据库。
每个块的大小为 1000,我可以等到其中的所有记录都得到处理,但我不想在块内的每个 rest 调用中阻塞一个线程。有什么办法可以做到吗?
我知道 Spring rest 模板是使进程阻塞的模板,应该使用 webclient,但是 spring 批处理中是否有任何等效组件(而不是AsyncItemProcessor/AsyncItemWriter) 的 Web 客户端?
不,Spring Batch 中不支持响应式编程,这里有一个开放的功能请求:https://github.com/spring-projects/spring-batch/issues/1008。
请注意,响应式意味着整个堆栈都应该是响应式的,从批处理工件(reader、处理器、编写器、侦听器等)到基础设施 bean(作业存储库、事务管理器等),不仅是您的项目处理器和作家。
此外,当前的块处理模型实际上与反应式范式不兼容。原因是 ChunkOrientedTasklet 基本上使用了两个合作者:
- A
ChunkProvider
提供项目块(将项目读取委托给ItemReader
) - A
ChunkProcessor
处理块(将处理和写入分别委托给ItemProcessor
/ItemWriter
)
这里是代码的简化版本:
Chunk inputs = chunkProvider.provide();
chunkProcessor.process(inputs);
如您所见,该步骤将等待 chunkProcessor(处理器 + 写入器)处理完整个块,然后再读取下一个块。因此,在您的情况下,即使您在处理器 + 编写器中使用非阻塞 API,您的步骤也会在读取下一个块之前等待块被完全处理(除了等待与作业存储库和事务管理器的阻塞交互) .