Spring 批处理。如何链接多个不同类型的项目处理器?
Spring batch .How to chain multiple itemProcessors with diffrent types?
我必须按如下方式组成 2 个处理器:
processor 1
使用 itemProcessor<A,B>
实现 itemProcessor
接口(转换数据)。
processor 2
使用 itemProcessor<B,B>
实现 itemProcessor
接口。(处理转换后的数据)。
CompositeItemProcessor<I, O>
要求委托为同一类型,而且在将其传递给 Step
时,步骤已经配置为固定类型 <A,B>
.
我如何将这些处理器链接到不同类型并将其分配给 step
处理器?
您需要使用 <A, B>
声明您的步骤以及复合处理器。这是一个简单的例子:
import java.util.Arrays;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class MyJobConfiguration {
@Bean
public ItemReader<A> itemReader() {
return new ListItemReader<>(Arrays.asList(new A("a1"), new A("a2")));
}
@Bean
public ItemProcessor<A, B> itemProcessor1() {
return item -> new B(item.name);
}
@Bean
public ItemProcessor<B, B> itemProcessor2() {
return item -> item; // TODO process item as needed
}
@Bean
public ItemProcessor<A, B> compositeItemProcessor() {
CompositeItemProcessor<A, B> compositeItemProcessor = new CompositeItemProcessor<>();
compositeItemProcessor.setDelegates(Arrays.asList(itemProcessor1(), itemProcessor2()));
return compositeItemProcessor;
}
@Bean
public ItemWriter<B> itemWriter() {
return items -> {
for (B item : items) {
System.out.println("item = " + item.name);
}
};
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs.get("job")
.start(steps.get("step")
.<A, B>chunk(2)
.reader(itemReader())
.processor(compositeItemProcessor())
.writer(itemWriter())
.build())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJobConfiguration.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
class A {
String name;
public A(String name) { this.name = name; }
}
class B {
String name;
public B(String name) { this.name = name; }
}
}
我必须按如下方式组成 2 个处理器:
processor 1
使用itemProcessor<A,B>
实现itemProcessor
接口(转换数据)。processor 2
使用itemProcessor<B,B>
实现itemProcessor
接口。(处理转换后的数据)。
CompositeItemProcessor<I, O>
要求委托为同一类型,而且在将其传递给 Step
时,步骤已经配置为固定类型 <A,B>
.
我如何将这些处理器链接到不同类型并将其分配给 step
处理器?
您需要使用 <A, B>
声明您的步骤以及复合处理器。这是一个简单的例子:
import java.util.Arrays;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class MyJobConfiguration {
@Bean
public ItemReader<A> itemReader() {
return new ListItemReader<>(Arrays.asList(new A("a1"), new A("a2")));
}
@Bean
public ItemProcessor<A, B> itemProcessor1() {
return item -> new B(item.name);
}
@Bean
public ItemProcessor<B, B> itemProcessor2() {
return item -> item; // TODO process item as needed
}
@Bean
public ItemProcessor<A, B> compositeItemProcessor() {
CompositeItemProcessor<A, B> compositeItemProcessor = new CompositeItemProcessor<>();
compositeItemProcessor.setDelegates(Arrays.asList(itemProcessor1(), itemProcessor2()));
return compositeItemProcessor;
}
@Bean
public ItemWriter<B> itemWriter() {
return items -> {
for (B item : items) {
System.out.println("item = " + item.name);
}
};
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs.get("job")
.start(steps.get("step")
.<A, B>chunk(2)
.reader(itemReader())
.processor(compositeItemProcessor())
.writer(itemWriter())
.build())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJobConfiguration.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
class A {
String name;
public A(String name) { this.name = name; }
}
class B {
String name;
public B(String name) { this.name = name; }
}
}