如何将 Spring Batch 中的 AsyncItemWriter 正确添加到步骤中?

How to add AsyncItemWriter in Spring Batch to a Step correctly?

在如下所示的 Spring 批处理作业中,我正在尝试使用 AsyncWriter

@Bean
public Step readWriteStep() throws Exception {
    return stepBuilderFactory.get("readWriteStep")
        .listener(listener)
        .<Data, Data>chunk(10)
        .reader(dataItemReader())
        .writer(dataAsyncWriter())
        .build();
}

@Bean
public AsyncItemWriter<Data> dataAsyncWriter() throws Exception {
    AsyncItemWriter<Data> asyncItemWriter = new AsyncItemWriter<>();
    asyncItemWriter.setDelegate(dataItemWriter);
    asyncItemWriter.afterPropertiesSet();
    return asyncItemWriter;
}

如果我这样尝试,intelliJ 会抱怨:

Required type: ItemWriter <? super Data>
Provided: AsyncItemWriter <Data>

当我将 .<Data, Data>chunk(10) 更改为 .<Data, Future<Data>>chunk(10) 时,intelliJ 没有发出任何警告,但是当我 运行 作业时,我得到以下异常:

java.lang.ClassCastException: Data cannot be cast to class java.util.concurrent.Future Data is in unnamed module of loader 'app'; 
java.util.concurrent.Future is in module java.base of loader 'bootstrap'

这里的第一个和第二个参数是什么? .<Data, Data>chunk(10)?

这两个参数是处理器获取的参数,第二个参数是处理器返回的参数吗?

我该如何解决这个问题?

如果您将步骤定义更改为使用以下内容,您的示例应该可以编译:

.<Data, Future<Data>>chunk(10)

就是说,我不确定这是否会在运行时正常工作,因为 AsyncItemWriter 预计会从其封闭的 Future 中展开项目,而这些 Future 是由 AsyncItemProcessor.

创建

换句话说,AsyncItemWriterAsyncItemProcessor 应该结合使用才能使此模式起作用。这是一个关于它们的简单示例:

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Future;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.integration.async.AsyncItemProcessor;
import org.springframework.batch.integration.async.AsyncItemWriter;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

@Configuration
@EnableBatchProcessing
public class SO72477556 {

    @Bean
    public ItemReader<Data> dataItemReader() {
        return new ListItemReader<Data>(Arrays.asList());
    }


    @Bean
    public ItemProcessor<Data, Data> dataItemProcessor() {
        return new ItemProcessor<Data, Data>() {
            @Override
            public Data process(Data item) throws Exception {
                return item;
            }
        };
    }

    @Bean
    public AsyncItemProcessor<Data, Data> asyncDataItemProcessor() {
        AsyncItemProcessor<Data, Data> asyncItemProcessor = new AsyncItemProcessor<>();
        asyncItemProcessor.setDelegate(dataItemProcessor());
        asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
        return asyncItemProcessor;
    }

    @Bean
    public ItemWriter<Data> dataItemWriter() {
        return new ItemWriter<Data>() {
            @Override
            public void write(List<? extends Data> items) throws Exception {

            }
        };
    }

    @Bean
    public AsyncItemWriter<Data> dataAsyncWriter() throws Exception {
        AsyncItemWriter<Data> asyncItemWriter = new AsyncItemWriter<>();
        asyncItemWriter.setDelegate(dataItemWriter());
        asyncItemWriter.afterPropertiesSet();
        return asyncItemWriter;
    }

    @Bean
    public Step readWriteStep(StepBuilderFactory stepBuilderFactory) throws Exception {
        return stepBuilderFactory.get("readWriteStep")
                .<Data, Future<Data>>chunk(10)
                .reader(dataItemReader())
                .processor(asyncDataItemProcessor())
                .writer(dataAsyncWriter())
                .build();
    }

    @Bean
    public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) throws Exception {
        return jobs.get("job")
                .start(readWriteStep(steps))
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(SO72477556.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

    static class Data {}

}