Spring batch: CSV文件处理如何拆分处理?

Spring batch: How to split and process the CSV file processing?

我有一个 spring 批处理应用程序,它使用 Azure SQL 服务器作为后端,我正在使用 Hibernate 更新数据库。

我正在使用 FlatfileReader 从 CSV 文件中读取数据并使用 ItemWriter 写入 Azure SQL 服务器,如下所述

    @Entity
    @Table(name = "STUDENTS")
    public class Student implements Serializable {
    
        @Id
        @GeneratedValue(strategy=GenerationType.SEQUENCE,generator="student_sequence")
        @SequenceGenerator(name="student_sequence",sequenceName="student_sequence", allocationSize = 1)
        @Column(name="STUDENT_ID", unique = true)
        private Long studentId;
    
        @Column(name = "STUDENT_NAME")
        private String studentName;
    
        @Temporal(TemporalType.DATE)
        @Column(name = "ENROLLED_DATE")
        private Date enrolledDate;
    
        public String getStudentName() {
            return studentName;
        }
    
        public void setStudentName(String studentName) {
            this.studentName = studentName;
        }
    
        public Date getEnrolledDate() {
            return enrolledDate;
        }
    
        public void setEnrolledDate(Date enrolledDate) {
            this.enrolledDate = enrolledDate;
        }
    
        @Override
        public String toString() {
            return "Student [studentId=" + studentId + ", studentName=" + studentName + ", enrolledDate=" + enrolledDate
                    + "]";
        }
    }

public class StudentMapper implements FieldSetMapper<Student> {

    private SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy");
    
    @Override
    public Student mapFieldSet(FieldSet fieldSet) throws BindException {
        
        Student student = new Student();
        student.setStudentName(fieldSet.readString(0));
        student.setEnrolledDate(new Date());

        return student;
    }

}

public class Reader implements ItemReader<FlatFileItemReader<Student>> {

    @Override
    public FlatFileItemReader<Student> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        // Data Sources
        Resource inputCsv = new ClassPathResource("demo01/input/record.csv");

        // Spring Batch's built-in reader
        FlatFileItemReader<Student> reader = new FlatFileItemReader<Student>();
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        String[] tokens = {"studentname"};
        tokenizer.setNames(tokens);
        reader.setResource(inputCsv);
        DefaultLineMapper<Student> lineMapper = new DefaultLineMapper<Student>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper(new StudentMapper());
        reader.setLineMapper(lineMapper);
        return reader;
    }
}

public class Processor implements ItemProcessor<Student, Student> {

    @Override
    public Student process(Student item) throws Exception {
        
        System.out.println("Processing..." + item);
        return item;
    }

}

public class Writer implements ItemWriter<Student> {

    @Autowired
    private SessionFactory sessionFactory;

    @Override
    public void write(List<? extends Student> list) throws Exception {
        HibernateItemWriter<Student> hibernateItemWriter = new HibernateItemWriter<Student>();
        hibernateItemWriter.setSessionFactory(sessionFactory);
        hibernateItemWriter.write(list);
        hibernateItemWriter.afterPropertiesSet();
    }
}

我将获得要处理的完整单个 CSV 文件,我不想拆分 CSV,因此我将使用相同的 Reader 过程来读取 CSV 记录。但是我想引入多个处理器和写入器来提高性能。类似于 AsyncItemProcessor

<bean id="student" class="com.mkyong.entity.Student" scope="prototype" />

<batch:job id="helloWorldJob">
    <batch:step id="step1">
        <batch:tasklet>
            <batch:chunk
                    reader="cvsFileItemReader"
                    processor="itemProcessor"
                    writer="itemWriter"
                commit-interval="10">
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
</batch:job>

<bean id="cvsFileItemReader" class="com.mkyong.batch.Reader" />

<bean id="itemProcessor" class="com.mkyong.batch.Processor" />

<bean id="itemWriter" class="com.mkyong.batch.Writer" />

如何将其转换为支持 AsyncItemProcessor

在进行异步或并行之前,您应该首先修复您共享的代码。 Reader class 不正确。当前声明读取类型为 FlatFileItemReader 而不是 Student 的项目。 read 方法应该 return 类型 Student 的项目而不是 reader 本身。这个class不需要,你可以直接在你的步骤中使用Spring批处理提供的FlatFileItemReader

作者也一样,你可以直接在你的步骤中使用HibernateItemWriter

现在要回答有关使用异步处理器和编写器的问题,您需要将处理器包装在 AsyncItemProcessor 中,将编写器包装在 AsyncItemWriter 中。您可以在此处找到更多详细信息和代码示例:Asynchronous Processors.