Spring 批次:未调用 ClassifierCompositeItemWriter 页脚
Spring batch : ClassifierCompositeItemWriter footer not getting called
我正在使用 Spring 批处理来编写多份报告。
要求是我将获得带有 BranchId 和名称的记录。我需要为每个 branchId 创建一个文件,并将相应的数据连同一些页眉和页脚写入该文件。
示例:
Student A = new Student("A",1);
Student B = new Student("B",2);
Student C = new Student("C",1);
Student D = new Student("D",4);
在这种情况下,它应该创建总共 3 个文件
file1-->1.txt(with A,C)
file2-->2.txt(with B)
file3-->4.txt(with D))
.
我正在使用 ClassifierCompositeItemWriter 创建/重用基于数据(在本例中为 id)的 FlatFileItemWriter 并能够成功创建文件。
对于页眉和页脚 - 在作者级别使用回调。
生成的文件只有 HEADER 和 DATA。但是不知何故,FOOTER 根本没有被执行。
文件在页脚之前关闭或使用 STEP SCOPE 时出现问题。
谁能帮我调用 FOOTER。
这是代码。
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor;
import org.springframework.batch.item.file.transform.DelimitedLineAggregator;
import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.classify.Classifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
@Configuration
@EnableBatchProcessing
public class MyJob3 {
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob3.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs.get("job").start(steps.get("step").<Student, Student>chunk(5)
.reader(itemReader())
.writer(getStudentItemWriter(itemWriterClassifier()))
.build())
.build();
}
@Bean
@StepScope
public ItemReader<Student> itemReader() {
Student A = new Student("A", 1);
Student B = new Student("B", 2);
Student C = new Student("C", 1);
Student D = new Student("D", 4);
Student E = new Student("E", 4);
return new ListItemReader<>(Arrays.asList(A,B,C,D,E));
}
Map<Integer, FlatFileItemWriter<Student>> map = new HashMap<>();
@Bean
@StepScope
public ClassifierCompositeItemWriter<Student> getStudentItemWriter(Classifier<Student, ItemWriter<? super Student>> classifier) {
ClassifierCompositeItemWriter<Student> compositeItemWriter = new ClassifierCompositeItemWriter<>();
compositeItemWriter.setClassifier(classifier);
return compositeItemWriter;
}
@Bean
@StepScope
public Classifier<Student, ItemWriter<? super Student>> itemWriterClassifier() {
return student -> {
System.out.println("Branch Id ::" + student.getBranchId() + " and Student Name" + student.getName());
if (map.containsKey(student.getBranchId())) {
FlatFileItemWriter<Student> result = map.get(student.getBranchId());
System.out.println("Exising Writer object ::" + result);
return result;
}
String fileName ="Branch_Info_" + student.getBranchId() + ".txt";
BeanWrapperFieldExtractor<Student> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[] { "branchId", "name" });
DelimitedLineAggregator<Student> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setFieldExtractor(fieldExtractor);
FlatFileItemWriter<Student> flatFileItemWriter = new FlatFileItemWriter<>();
flatFileItemWriter.setResource(new FileSystemResource(fileName));
flatFileItemWriter.setAppendAllowed(true);
flatFileItemWriter.setLineAggregator(lineAggregator);
System.out.println("Writing header...");
flatFileItemWriter.setHeaderCallback(writer -> writer.write("Header"));
System.out.println("Writing Footer...");
flatFileItemWriter.setFooterCallback(writer -> writer.write("Footer"));
System.out.println("Writing done...");
flatFileItemWriter.open(new ExecutionContext());
map.put(student.getBranchId(), flatFileItemWriter);
System.out.println("New Writer object ::" + flatFileItemWriter);
return flatFileItemWriter;
};
}
}
In my case, i don't have fixed number writers (foo & boo in your case) and they will be dynamic and need to create at RUN time. Any suggestions on how to do it and register them to step?
在这种情况下,您需要:
- 预先计算可能的不同值(在您的情况下为 1、2 和 4),例如使用
select distinct(id) from table
之类的查询或类似的机制,具体取决于您输入的数据
- 动态创建
ItemWriter
beans 并在您的步骤中将它们注册为流。
以下是基于您的用例的示例:给定不同组的学生列表,想法是根据他们的组将它们写入不同的文件中。这是一个 tasklet,它在应用程序上下文中动态地预先计算不同的组和 creates/registers 项编写器:
import java.io.IOException;
import java.io.Writer;
import java.util.List;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.file.FlatFileFooterCallback;
import org.springframework.batch.item.file.FlatFileHeaderCallback;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.transform.PassThroughLineAggregator;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.MutablePropertyValues;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.GenericBeanDefinition;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.JdbcTemplate;
class DynamicWritersConfigurationTasklet implements Tasklet {
private JdbcTemplate jdbcTemplate;
private ConfigurableApplicationContext applicationContext;
public DynamicWritersConfigurationTasklet(JdbcTemplate jdbcTemplate, ConfigurableApplicationContext applicationContext) {
this.jdbcTemplate = jdbcTemplate;
this.applicationContext = applicationContext;
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
ConfigurableListableBeanFactory beanFactory = applicationContext.getBeanFactory();
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
String sql = "select distinct(groupId) from student";
List<Integer> groups = jdbcTemplate.queryForList(sql, Integer.class);
for (Integer group : groups) {
String name = "group" + group + "Writer";
GenericBeanDefinition beanDefinition = new GenericBeanDefinition();
beanDefinition.setBeanClassName(FlatFileItemWriter.class.getName());
MutablePropertyValues propertyValues = new MutablePropertyValues();
propertyValues.addPropertyValue("name", name);
propertyValues.addPropertyValue("lineAggregator", new PassThroughLineAggregator<>());
propertyValues.addPropertyValue("resource", new FileSystemResource(group + ".txt"));
propertyValues.addPropertyValue("headerCallback", (FlatFileHeaderCallback) writer -> writer.write("header"));
propertyValues.addPropertyValue("footerCallback", (FlatFileFooterCallback) writer -> writer.write("footer"));
beanDefinition.setPropertyValues(propertyValues);
registry.registerBeanDefinition(name, beanDefinition);
}
return RepeatStatus.FINISHED;
}
}
一旦到位,第二步在运行时从应用程序上下文加载那些项目编写器并将它们注册为ClassifierCompositeItemWriter
中的委托:
@Bean
@StepScope
public ClassifierCompositeItemWriter<Student> itemWriter(ConfigurableApplicationContext applicationContext) {
// dynamically get writers from the application context and register them as delegates in the composite
Map<String, FlatFileItemWriter> beansOfType = applicationContext.getBeansOfType(FlatFileItemWriter.class);
// Classify students by group
Classifier<Student, FlatFileItemWriter<Student>> classifier = student -> beansOfType.get("group" + student.getGroupId() + "Writer");
return new ClassifierCompositeItemWriterBuilder()
.classifier(classifier)
.build();
}
@Bean
@JobScope
public Step step2(StepBuilderFactory stepBuilderFactory, ConfigurableApplicationContext applicationContext, DataSource dataSource) {
SimpleStepBuilder<Student, Student> step2 = stepBuilderFactory.get("readWriteStudents")
.<Student, Student>chunk(2)
.reader(itemReader(dataSource))
.writer(itemWriter(applicationContext));
// register writers as streams in the step so that open/update/close are called correctly
Map<String, FlatFileItemWriter> beansOfType = applicationContext.getBeansOfType(FlatFileItemWriter.class);
for (FlatFileItemWriter flatFileItemWriter : beansOfType.values()) {
step2.stream(flatFileItemWriter);
}
return step2.build();
}
我这里有一个完整的示例:sample app for SO67604628. Please refer to this guide 了解如何检出单个文件夹(如果您不想克隆整个存储库)。该示例生成 3 个文件,其中学生按 groupId 分组。请注意如何正确生成 headers/footers,因为委托编写器在步骤中注册为流。
我正在使用 Spring 批处理来编写多份报告。 要求是我将获得带有 BranchId 和名称的记录。我需要为每个 branchId 创建一个文件,并将相应的数据连同一些页眉和页脚写入该文件。
示例:
Student A = new Student("A",1);
Student B = new Student("B",2);
Student C = new Student("C",1);
Student D = new Student("D",4);
在这种情况下,它应该创建总共 3 个文件
file1-->1.txt(with A,C)
file2-->2.txt(with B)
file3-->4.txt(with D))
.
我正在使用 ClassifierCompositeItemWriter 创建/重用基于数据(在本例中为 id)的 FlatFileItemWriter 并能够成功创建文件。 对于页眉和页脚 - 在作者级别使用回调。 生成的文件只有 HEADER 和 DATA。但是不知何故,FOOTER 根本没有被执行。
文件在页脚之前关闭或使用 STEP SCOPE 时出现问题。
谁能帮我调用 FOOTER。
这是代码。
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor;
import org.springframework.batch.item.file.transform.DelimitedLineAggregator;
import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.classify.Classifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
@Configuration
@EnableBatchProcessing
public class MyJob3 {
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob3.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs.get("job").start(steps.get("step").<Student, Student>chunk(5)
.reader(itemReader())
.writer(getStudentItemWriter(itemWriterClassifier()))
.build())
.build();
}
@Bean
@StepScope
public ItemReader<Student> itemReader() {
Student A = new Student("A", 1);
Student B = new Student("B", 2);
Student C = new Student("C", 1);
Student D = new Student("D", 4);
Student E = new Student("E", 4);
return new ListItemReader<>(Arrays.asList(A,B,C,D,E));
}
Map<Integer, FlatFileItemWriter<Student>> map = new HashMap<>();
@Bean
@StepScope
public ClassifierCompositeItemWriter<Student> getStudentItemWriter(Classifier<Student, ItemWriter<? super Student>> classifier) {
ClassifierCompositeItemWriter<Student> compositeItemWriter = new ClassifierCompositeItemWriter<>();
compositeItemWriter.setClassifier(classifier);
return compositeItemWriter;
}
@Bean
@StepScope
public Classifier<Student, ItemWriter<? super Student>> itemWriterClassifier() {
return student -> {
System.out.println("Branch Id ::" + student.getBranchId() + " and Student Name" + student.getName());
if (map.containsKey(student.getBranchId())) {
FlatFileItemWriter<Student> result = map.get(student.getBranchId());
System.out.println("Exising Writer object ::" + result);
return result;
}
String fileName ="Branch_Info_" + student.getBranchId() + ".txt";
BeanWrapperFieldExtractor<Student> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[] { "branchId", "name" });
DelimitedLineAggregator<Student> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setFieldExtractor(fieldExtractor);
FlatFileItemWriter<Student> flatFileItemWriter = new FlatFileItemWriter<>();
flatFileItemWriter.setResource(new FileSystemResource(fileName));
flatFileItemWriter.setAppendAllowed(true);
flatFileItemWriter.setLineAggregator(lineAggregator);
System.out.println("Writing header...");
flatFileItemWriter.setHeaderCallback(writer -> writer.write("Header"));
System.out.println("Writing Footer...");
flatFileItemWriter.setFooterCallback(writer -> writer.write("Footer"));
System.out.println("Writing done...");
flatFileItemWriter.open(new ExecutionContext());
map.put(student.getBranchId(), flatFileItemWriter);
System.out.println("New Writer object ::" + flatFileItemWriter);
return flatFileItemWriter;
};
}
}
In my case, i don't have fixed number writers (foo & boo in your case) and they will be dynamic and need to create at RUN time. Any suggestions on how to do it and register them to step?
在这种情况下,您需要:
- 预先计算可能的不同值(在您的情况下为 1、2 和 4),例如使用
select distinct(id) from table
之类的查询或类似的机制,具体取决于您输入的数据 - 动态创建
ItemWriter
beans 并在您的步骤中将它们注册为流。
以下是基于您的用例的示例:给定不同组的学生列表,想法是根据他们的组将它们写入不同的文件中。这是一个 tasklet,它在应用程序上下文中动态地预先计算不同的组和 creates/registers 项编写器:
import java.io.IOException;
import java.io.Writer;
import java.util.List;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.file.FlatFileFooterCallback;
import org.springframework.batch.item.file.FlatFileHeaderCallback;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.transform.PassThroughLineAggregator;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.MutablePropertyValues;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.GenericBeanDefinition;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.JdbcTemplate;
class DynamicWritersConfigurationTasklet implements Tasklet {
private JdbcTemplate jdbcTemplate;
private ConfigurableApplicationContext applicationContext;
public DynamicWritersConfigurationTasklet(JdbcTemplate jdbcTemplate, ConfigurableApplicationContext applicationContext) {
this.jdbcTemplate = jdbcTemplate;
this.applicationContext = applicationContext;
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
ConfigurableListableBeanFactory beanFactory = applicationContext.getBeanFactory();
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
String sql = "select distinct(groupId) from student";
List<Integer> groups = jdbcTemplate.queryForList(sql, Integer.class);
for (Integer group : groups) {
String name = "group" + group + "Writer";
GenericBeanDefinition beanDefinition = new GenericBeanDefinition();
beanDefinition.setBeanClassName(FlatFileItemWriter.class.getName());
MutablePropertyValues propertyValues = new MutablePropertyValues();
propertyValues.addPropertyValue("name", name);
propertyValues.addPropertyValue("lineAggregator", new PassThroughLineAggregator<>());
propertyValues.addPropertyValue("resource", new FileSystemResource(group + ".txt"));
propertyValues.addPropertyValue("headerCallback", (FlatFileHeaderCallback) writer -> writer.write("header"));
propertyValues.addPropertyValue("footerCallback", (FlatFileFooterCallback) writer -> writer.write("footer"));
beanDefinition.setPropertyValues(propertyValues);
registry.registerBeanDefinition(name, beanDefinition);
}
return RepeatStatus.FINISHED;
}
}
一旦到位,第二步在运行时从应用程序上下文加载那些项目编写器并将它们注册为ClassifierCompositeItemWriter
中的委托:
@Bean
@StepScope
public ClassifierCompositeItemWriter<Student> itemWriter(ConfigurableApplicationContext applicationContext) {
// dynamically get writers from the application context and register them as delegates in the composite
Map<String, FlatFileItemWriter> beansOfType = applicationContext.getBeansOfType(FlatFileItemWriter.class);
// Classify students by group
Classifier<Student, FlatFileItemWriter<Student>> classifier = student -> beansOfType.get("group" + student.getGroupId() + "Writer");
return new ClassifierCompositeItemWriterBuilder()
.classifier(classifier)
.build();
}
@Bean
@JobScope
public Step step2(StepBuilderFactory stepBuilderFactory, ConfigurableApplicationContext applicationContext, DataSource dataSource) {
SimpleStepBuilder<Student, Student> step2 = stepBuilderFactory.get("readWriteStudents")
.<Student, Student>chunk(2)
.reader(itemReader(dataSource))
.writer(itemWriter(applicationContext));
// register writers as streams in the step so that open/update/close are called correctly
Map<String, FlatFileItemWriter> beansOfType = applicationContext.getBeansOfType(FlatFileItemWriter.class);
for (FlatFileItemWriter flatFileItemWriter : beansOfType.values()) {
step2.stream(flatFileItemWriter);
}
return step2.build();
}
我这里有一个完整的示例:sample app for SO67604628. Please refer to this guide 了解如何检出单个文件夹(如果您不想克隆整个存储库)。该示例生成 3 个文件,其中学生按 groupId 分组。请注意如何正确生成 headers/footers,因为委托编写器在步骤中注册为流。