Spring 将 DB 批处理到 JSON 个文件

Spring batch DB to JSON files

这个问题似乎与 重复,但它不是

我的要求是使用 JdbcPagingItemReader 从数据库中读取数据并处理单个记录以进行一些额外的处理,并在编写器中为每个处理过的项目创建单独的 json 文件姓名 id_of_record_json_fie.txt

例如,如果 reader 读取 100 条记录,则必须创建 100 JSON 个文件

最好的方法是什么,我们可以使用 spring 批处理吗?

更新 1-:

根据@Mahmoud 的回答,可以使用 tasklet,我也尝试在面向块的步骤中实现自定义 itemwriter,这似乎也有效

      @Override
        public void write(final List<? extends Person> persons) throws Exception {
            
            for (Person  person: persons) {
                objectMapper.writeValue(new File("D:/cp/dataTwo.json"), person);
            }
            
        }

使用面向块的 tasklet 是行不通的,因为会有一个单独的项目编写器,其资源是预先设置的,并且在整个步骤中都是固定的。使用复合项目编写器可能有效,但您需要知道要预先创建和配置多少个不同的编写器。

我看到的最直接的选择是使用 tasklet,例如:

import java.util.Collections;
import java.util.HashMap;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;

@Configuration
@EnableBatchProcessing
public class MyJob {

    @Bean
    public JdbcPagingItemReader<Person> itemReader() {
        return new JdbcPagingItemReaderBuilder<Person>()
                .name("personItemReader")
                .dataSource(dataSource())
                .beanRowMapper(Person.class)
                .selectClause("select *")
                .fromClause("from person")
                .sortKeys(new HashMap<String, Order>() {{ put("id", Order.DESCENDING);}})
                .build();
    }

    @Bean
    public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
        return jobs.get("job")
                .start(steps.get("step")
                        .tasklet(new MyTasklet(itemReader()))
                        .build())
                .build();
    }
    
    private static class MyTasklet implements Tasklet {

        private boolean readerInitialized;
        private JdbcPagingItemReader<Person> itemReader;

        public MyTasklet(JdbcPagingItemReader<Person> itemReader) {
            this.itemReader = itemReader;
        }

        @Override
        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
            ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext();
            if (!readerInitialized) {
                itemReader.open(executionContext);
                readerInitialized = true;
            }
            Person person = itemReader.read();
            if (person == null) {
                itemReader.close();
                return RepeatStatus.FINISHED;
            }
            // process the item
            process(person);
            // write the item in its own file (dynamically generated at runtime)
            write(person, executionContext);
            // save current state in execution context: in case of restart after failure, the job would resume where it left off.
            itemReader.update(executionContext);
            return RepeatStatus.CONTINUABLE;
        }

        private void process(Person person) {
            // do something with the item
        }
        
        private void write(Person person, ExecutionContext executionContext) throws Exception {
            FlatFileItemWriter<Person> itemWriter = new FlatFileItemWriterBuilder<Person>()
                    .resource(new FileSystemResource("person" + person.getId() + ".csv"))
                    .name("personItemWriter")
                    .delimited()
                    .names("id", "name")
                    .build();
            itemWriter.open(executionContext);
            itemWriter.write(Collections.singletonList(person));
            itemWriter.close();
        }
        
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

    @Bean
    public DataSource dataSource() {
        EmbeddedDatabase embeddedDatabase = new EmbeddedDatabaseBuilder()
                .setType(EmbeddedDatabaseType.H2)
                .addScript("/org/springframework/batch/core/schema-drop-h2.sql")
                .addScript("/org/springframework/batch/core/schema-h2.sql")
                .build();
        JdbcTemplate jdbcTemplate = new JdbcTemplate(embeddedDatabase);
        jdbcTemplate.execute("create table person (id int primary key, name varchar(20));");
        for (int i = 1; i <= 10; i++) {
            jdbcTemplate.execute(String.format("insert into person values (%s, 'foo%s');", i, i));
        }
        return embeddedDatabase;
    }

    static class Person {
        private int id;
        private String name;

        public Person() {
        }

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String toString() {
            return "Person{id=" + id + ", name='" + name + '\'' + '}';
        }
    }

}

此示例从数据库 table 中读取 10 个人并生成 10 个 csv 文件(person1.csvperson2.csv 等)