如何将 Spring Batch 中的 AsyncItemWriter 正确添加到步骤中?
How to add AsyncItemWriter in Spring Batch to a Step correctly?
在如下所示的 Spring 批处理作业中,我正在尝试使用 AsyncWriter
@Bean
public Step readWriteStep() throws Exception {
return stepBuilderFactory.get("readWriteStep")
.listener(listener)
.<Data, Data>chunk(10)
.reader(dataItemReader())
.writer(dataAsyncWriter())
.build();
}
@Bean
public AsyncItemWriter<Data> dataAsyncWriter() throws Exception {
AsyncItemWriter<Data> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(dataItemWriter);
asyncItemWriter.afterPropertiesSet();
return asyncItemWriter;
}
如果我这样尝试,intelliJ 会抱怨:
Required type: ItemWriter <? super Data>
Provided: AsyncItemWriter <Data>
当我将 .<Data, Data>chunk(10)
更改为 .<Data, Future<Data>>chunk(10)
时,intelliJ 没有发出任何警告,但是当我 运行 作业时,我得到以下异常:
java.lang.ClassCastException: Data cannot be cast to class java.util.concurrent.Future Data is in unnamed module of loader 'app';
java.util.concurrent.Future is in module java.base of loader 'bootstrap'
这里的第一个和第二个参数是什么? .<Data, Data>chunk(10)
?
这两个参数是处理器获取的参数,第二个参数是处理器返回的参数吗?
我该如何解决这个问题?
如果您将步骤定义更改为使用以下内容,您的示例应该可以编译:
.<Data, Future<Data>>chunk(10)
就是说,我不确定这是否会在运行时正常工作,因为 AsyncItemWriter
预计会从其封闭的 Future
中展开项目,而这些 Future
是由 AsyncItemProcessor.
创建
换句话说,AsyncItemWriter
和 AsyncItemProcessor
应该结合使用才能使此模式起作用。这是一个关于它们的简单示例:
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Future;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.integration.async.AsyncItemProcessor;
import org.springframework.batch.integration.async.AsyncItemWriter;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
@Configuration
@EnableBatchProcessing
public class SO72477556 {
@Bean
public ItemReader<Data> dataItemReader() {
return new ListItemReader<Data>(Arrays.asList());
}
@Bean
public ItemProcessor<Data, Data> dataItemProcessor() {
return new ItemProcessor<Data, Data>() {
@Override
public Data process(Data item) throws Exception {
return item;
}
};
}
@Bean
public AsyncItemProcessor<Data, Data> asyncDataItemProcessor() {
AsyncItemProcessor<Data, Data> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(dataItemProcessor());
asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
return asyncItemProcessor;
}
@Bean
public ItemWriter<Data> dataItemWriter() {
return new ItemWriter<Data>() {
@Override
public void write(List<? extends Data> items) throws Exception {
}
};
}
@Bean
public AsyncItemWriter<Data> dataAsyncWriter() throws Exception {
AsyncItemWriter<Data> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(dataItemWriter());
asyncItemWriter.afterPropertiesSet();
return asyncItemWriter;
}
@Bean
public Step readWriteStep(StepBuilderFactory stepBuilderFactory) throws Exception {
return stepBuilderFactory.get("readWriteStep")
.<Data, Future<Data>>chunk(10)
.reader(dataItemReader())
.processor(asyncDataItemProcessor())
.writer(dataAsyncWriter())
.build();
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) throws Exception {
return jobs.get("job")
.start(readWriteStep(steps))
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(SO72477556.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
static class Data {}
}
在如下所示的 Spring 批处理作业中,我正在尝试使用 AsyncWriter
@Bean
public Step readWriteStep() throws Exception {
return stepBuilderFactory.get("readWriteStep")
.listener(listener)
.<Data, Data>chunk(10)
.reader(dataItemReader())
.writer(dataAsyncWriter())
.build();
}
@Bean
public AsyncItemWriter<Data> dataAsyncWriter() throws Exception {
AsyncItemWriter<Data> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(dataItemWriter);
asyncItemWriter.afterPropertiesSet();
return asyncItemWriter;
}
如果我这样尝试,intelliJ 会抱怨:
Required type: ItemWriter <? super Data>
Provided: AsyncItemWriter <Data>
当我将 .<Data, Data>chunk(10)
更改为 .<Data, Future<Data>>chunk(10)
时,intelliJ 没有发出任何警告,但是当我 运行 作业时,我得到以下异常:
java.lang.ClassCastException: Data cannot be cast to class java.util.concurrent.Future Data is in unnamed module of loader 'app';
java.util.concurrent.Future is in module java.base of loader 'bootstrap'
这里的第一个和第二个参数是什么? .<Data, Data>chunk(10)
?
这两个参数是处理器获取的参数,第二个参数是处理器返回的参数吗?
我该如何解决这个问题?
如果您将步骤定义更改为使用以下内容,您的示例应该可以编译:
.<Data, Future<Data>>chunk(10)
就是说,我不确定这是否会在运行时正常工作,因为 AsyncItemWriter
预计会从其封闭的 Future
中展开项目,而这些 Future
是由 AsyncItemProcessor.
换句话说,AsyncItemWriter
和 AsyncItemProcessor
应该结合使用才能使此模式起作用。这是一个关于它们的简单示例:
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Future;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.integration.async.AsyncItemProcessor;
import org.springframework.batch.integration.async.AsyncItemWriter;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
@Configuration
@EnableBatchProcessing
public class SO72477556 {
@Bean
public ItemReader<Data> dataItemReader() {
return new ListItemReader<Data>(Arrays.asList());
}
@Bean
public ItemProcessor<Data, Data> dataItemProcessor() {
return new ItemProcessor<Data, Data>() {
@Override
public Data process(Data item) throws Exception {
return item;
}
};
}
@Bean
public AsyncItemProcessor<Data, Data> asyncDataItemProcessor() {
AsyncItemProcessor<Data, Data> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(dataItemProcessor());
asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
return asyncItemProcessor;
}
@Bean
public ItemWriter<Data> dataItemWriter() {
return new ItemWriter<Data>() {
@Override
public void write(List<? extends Data> items) throws Exception {
}
};
}
@Bean
public AsyncItemWriter<Data> dataAsyncWriter() throws Exception {
AsyncItemWriter<Data> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(dataItemWriter());
asyncItemWriter.afterPropertiesSet();
return asyncItemWriter;
}
@Bean
public Step readWriteStep(StepBuilderFactory stepBuilderFactory) throws Exception {
return stepBuilderFactory.get("readWriteStep")
.<Data, Future<Data>>chunk(10)
.reader(dataItemReader())
.processor(asyncDataItemProcessor())
.writer(dataAsyncWriter())
.build();
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) throws Exception {
return jobs.get("job")
.start(readWriteStep(steps))
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(SO72477556.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
static class Data {}
}