Spring 将 DB 批处理到 JSON 个文件
Spring batch DB to JSON files
这个问题似乎与 重复,但它不是
我的要求是使用 JdbcPagingItemReader 从数据库中读取数据并处理单个记录以进行一些额外的处理,并在编写器中为每个处理过的项目创建单独的 json 文件姓名 id_of_record_json_fie.txt
例如,如果 reader 读取 100 条记录,则必须创建 100 JSON 个文件
最好的方法是什么,我们可以使用 spring 批处理吗?
更新 1-:
根据@Mahmoud 的回答,可以使用 tasklet,我也尝试在面向块的步骤中实现自定义 itemwriter,这似乎也有效
@Override
public void write(final List<? extends Person> persons) throws Exception {
for (Person person: persons) {
objectMapper.writeValue(new File("D:/cp/dataTwo.json"), person);
}
}
使用面向块的 tasklet 是行不通的,因为会有一个单独的项目编写器,其资源是预先设置的,并且在整个步骤中都是固定的。使用复合项目编写器可能有效,但您需要知道要预先创建和配置多少个不同的编写器。
我看到的最直接的选择是使用 tasklet,例如:
import java.util.Collections;
import java.util.HashMap;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
@Configuration
@EnableBatchProcessing
public class MyJob {
@Bean
public JdbcPagingItemReader<Person> itemReader() {
return new JdbcPagingItemReaderBuilder<Person>()
.name("personItemReader")
.dataSource(dataSource())
.beanRowMapper(Person.class)
.selectClause("select *")
.fromClause("from person")
.sortKeys(new HashMap<String, Order>() {{ put("id", Order.DESCENDING);}})
.build();
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs.get("job")
.start(steps.get("step")
.tasklet(new MyTasklet(itemReader()))
.build())
.build();
}
private static class MyTasklet implements Tasklet {
private boolean readerInitialized;
private JdbcPagingItemReader<Person> itemReader;
public MyTasklet(JdbcPagingItemReader<Person> itemReader) {
this.itemReader = itemReader;
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext();
if (!readerInitialized) {
itemReader.open(executionContext);
readerInitialized = true;
}
Person person = itemReader.read();
if (person == null) {
itemReader.close();
return RepeatStatus.FINISHED;
}
// process the item
process(person);
// write the item in its own file (dynamically generated at runtime)
write(person, executionContext);
// save current state in execution context: in case of restart after failure, the job would resume where it left off.
itemReader.update(executionContext);
return RepeatStatus.CONTINUABLE;
}
private void process(Person person) {
// do something with the item
}
private void write(Person person, ExecutionContext executionContext) throws Exception {
FlatFileItemWriter<Person> itemWriter = new FlatFileItemWriterBuilder<Person>()
.resource(new FileSystemResource("person" + person.getId() + ".csv"))
.name("personItemWriter")
.delimited()
.names("id", "name")
.build();
itemWriter.open(executionContext);
itemWriter.write(Collections.singletonList(person));
itemWriter.close();
}
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
@Bean
public DataSource dataSource() {
EmbeddedDatabase embeddedDatabase = new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
.addScript("/org/springframework/batch/core/schema-drop-h2.sql")
.addScript("/org/springframework/batch/core/schema-h2.sql")
.build();
JdbcTemplate jdbcTemplate = new JdbcTemplate(embeddedDatabase);
jdbcTemplate.execute("create table person (id int primary key, name varchar(20));");
for (int i = 1; i <= 10; i++) {
jdbcTemplate.execute(String.format("insert into person values (%s, 'foo%s');", i, i));
}
return embeddedDatabase;
}
static class Person {
private int id;
private String name;
public Person() {
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String toString() {
return "Person{id=" + id + ", name='" + name + '\'' + '}';
}
}
}
此示例从数据库 table 中读取 10 个人并生成 10 个 csv 文件(person1.csv
、person2.csv
等)
这个问题似乎与
我的要求是使用 JdbcPagingItemReader 从数据库中读取数据并处理单个记录以进行一些额外的处理,并在编写器中为每个处理过的项目创建单独的 json 文件姓名 id_of_record_json_fie.txt
例如,如果 reader 读取 100 条记录,则必须创建 100 JSON 个文件
最好的方法是什么,我们可以使用 spring 批处理吗?
更新 1-:
根据@Mahmoud 的回答,可以使用 tasklet,我也尝试在面向块的步骤中实现自定义 itemwriter,这似乎也有效
@Override
public void write(final List<? extends Person> persons) throws Exception {
for (Person person: persons) {
objectMapper.writeValue(new File("D:/cp/dataTwo.json"), person);
}
}
使用面向块的 tasklet 是行不通的,因为会有一个单独的项目编写器,其资源是预先设置的,并且在整个步骤中都是固定的。使用复合项目编写器可能有效,但您需要知道要预先创建和配置多少个不同的编写器。
我看到的最直接的选择是使用 tasklet,例如:
import java.util.Collections;
import java.util.HashMap;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
@Configuration
@EnableBatchProcessing
public class MyJob {
@Bean
public JdbcPagingItemReader<Person> itemReader() {
return new JdbcPagingItemReaderBuilder<Person>()
.name("personItemReader")
.dataSource(dataSource())
.beanRowMapper(Person.class)
.selectClause("select *")
.fromClause("from person")
.sortKeys(new HashMap<String, Order>() {{ put("id", Order.DESCENDING);}})
.build();
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs.get("job")
.start(steps.get("step")
.tasklet(new MyTasklet(itemReader()))
.build())
.build();
}
private static class MyTasklet implements Tasklet {
private boolean readerInitialized;
private JdbcPagingItemReader<Person> itemReader;
public MyTasklet(JdbcPagingItemReader<Person> itemReader) {
this.itemReader = itemReader;
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext();
if (!readerInitialized) {
itemReader.open(executionContext);
readerInitialized = true;
}
Person person = itemReader.read();
if (person == null) {
itemReader.close();
return RepeatStatus.FINISHED;
}
// process the item
process(person);
// write the item in its own file (dynamically generated at runtime)
write(person, executionContext);
// save current state in execution context: in case of restart after failure, the job would resume where it left off.
itemReader.update(executionContext);
return RepeatStatus.CONTINUABLE;
}
private void process(Person person) {
// do something with the item
}
private void write(Person person, ExecutionContext executionContext) throws Exception {
FlatFileItemWriter<Person> itemWriter = new FlatFileItemWriterBuilder<Person>()
.resource(new FileSystemResource("person" + person.getId() + ".csv"))
.name("personItemWriter")
.delimited()
.names("id", "name")
.build();
itemWriter.open(executionContext);
itemWriter.write(Collections.singletonList(person));
itemWriter.close();
}
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
@Bean
public DataSource dataSource() {
EmbeddedDatabase embeddedDatabase = new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
.addScript("/org/springframework/batch/core/schema-drop-h2.sql")
.addScript("/org/springframework/batch/core/schema-h2.sql")
.build();
JdbcTemplate jdbcTemplate = new JdbcTemplate(embeddedDatabase);
jdbcTemplate.execute("create table person (id int primary key, name varchar(20));");
for (int i = 1; i <= 10; i++) {
jdbcTemplate.execute(String.format("insert into person values (%s, 'foo%s');", i, i));
}
return embeddedDatabase;
}
static class Person {
private int id;
private String name;
public Person() {
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String toString() {
return "Person{id=" + id + ", name='" + name + '\'' + '}';
}
}
}
此示例从数据库 table 中读取 10 个人并生成 10 个 csv 文件(person1.csv
、person2.csv
等)