Spring batch: CSV文件处理如何拆分处理?
Spring batch: How to split and process the CSV file processing?
我有一个 spring 批处理应用程序,它使用 Azure SQL 服务器作为后端,我正在使用 Hibernate 更新数据库。
我正在使用 FlatfileReader 从 CSV 文件中读取数据并使用 ItemWriter 写入 Azure SQL 服务器,如下所述
@Entity
@Table(name = "STUDENTS")
public class Student implements Serializable {
@Id
@GeneratedValue(strategy=GenerationType.SEQUENCE,generator="student_sequence")
@SequenceGenerator(name="student_sequence",sequenceName="student_sequence", allocationSize = 1)
@Column(name="STUDENT_ID", unique = true)
private Long studentId;
@Column(name = "STUDENT_NAME")
private String studentName;
@Temporal(TemporalType.DATE)
@Column(name = "ENROLLED_DATE")
private Date enrolledDate;
public String getStudentName() {
return studentName;
}
public void setStudentName(String studentName) {
this.studentName = studentName;
}
public Date getEnrolledDate() {
return enrolledDate;
}
public void setEnrolledDate(Date enrolledDate) {
this.enrolledDate = enrolledDate;
}
@Override
public String toString() {
return "Student [studentId=" + studentId + ", studentName=" + studentName + ", enrolledDate=" + enrolledDate
+ "]";
}
}
public class StudentMapper implements FieldSetMapper<Student> {
private SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy");
@Override
public Student mapFieldSet(FieldSet fieldSet) throws BindException {
Student student = new Student();
student.setStudentName(fieldSet.readString(0));
student.setEnrolledDate(new Date());
return student;
}
}
public class Reader implements ItemReader<FlatFileItemReader<Student>> {
@Override
public FlatFileItemReader<Student> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
// Data Sources
Resource inputCsv = new ClassPathResource("demo01/input/record.csv");
// Spring Batch's built-in reader
FlatFileItemReader<Student> reader = new FlatFileItemReader<Student>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
String[] tokens = {"studentname"};
tokenizer.setNames(tokens);
reader.setResource(inputCsv);
DefaultLineMapper<Student> lineMapper = new DefaultLineMapper<Student>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(new StudentMapper());
reader.setLineMapper(lineMapper);
return reader;
}
}
public class Processor implements ItemProcessor<Student, Student> {
@Override
public Student process(Student item) throws Exception {
System.out.println("Processing..." + item);
return item;
}
}
public class Writer implements ItemWriter<Student> {
@Autowired
private SessionFactory sessionFactory;
@Override
public void write(List<? extends Student> list) throws Exception {
HibernateItemWriter<Student> hibernateItemWriter = new HibernateItemWriter<Student>();
hibernateItemWriter.setSessionFactory(sessionFactory);
hibernateItemWriter.write(list);
hibernateItemWriter.afterPropertiesSet();
}
}
我将获得要处理的完整单个 CSV 文件,我不想拆分 CSV,因此我将使用相同的 Reader 过程来读取 CSV 记录。但是我想引入多个处理器和写入器来提高性能。类似于 AsyncItemProcessor
<bean id="student" class="com.mkyong.entity.Student" scope="prototype" />
<batch:job id="helloWorldJob">
<batch:step id="step1">
<batch:tasklet>
<batch:chunk
reader="cvsFileItemReader"
processor="itemProcessor"
writer="itemWriter"
commit-interval="10">
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
<bean id="cvsFileItemReader" class="com.mkyong.batch.Reader" />
<bean id="itemProcessor" class="com.mkyong.batch.Processor" />
<bean id="itemWriter" class="com.mkyong.batch.Writer" />
如何将其转换为支持 AsyncItemProcessor?
在进行异步或并行之前,您应该首先修复您共享的代码。 Reader
class 不正确。当前声明读取类型为 FlatFileItemReader
而不是 Student
的项目。 read
方法应该 return 类型 Student
的项目而不是 reader 本身。这个class不需要,你可以直接在你的步骤中使用Spring批处理提供的FlatFileItemReader
。
作者也一样,你可以直接在你的步骤中使用HibernateItemWriter
。
现在要回答有关使用异步处理器和编写器的问题,您需要将处理器包装在 AsyncItemProcessor
中,将编写器包装在 AsyncItemWriter
中。您可以在此处找到更多详细信息和代码示例:Asynchronous Processors.
我有一个 spring 批处理应用程序,它使用 Azure SQL 服务器作为后端,我正在使用 Hibernate 更新数据库。
我正在使用 FlatfileReader 从 CSV 文件中读取数据并使用 ItemWriter 写入 Azure SQL 服务器,如下所述
@Entity
@Table(name = "STUDENTS")
public class Student implements Serializable {
@Id
@GeneratedValue(strategy=GenerationType.SEQUENCE,generator="student_sequence")
@SequenceGenerator(name="student_sequence",sequenceName="student_sequence", allocationSize = 1)
@Column(name="STUDENT_ID", unique = true)
private Long studentId;
@Column(name = "STUDENT_NAME")
private String studentName;
@Temporal(TemporalType.DATE)
@Column(name = "ENROLLED_DATE")
private Date enrolledDate;
public String getStudentName() {
return studentName;
}
public void setStudentName(String studentName) {
this.studentName = studentName;
}
public Date getEnrolledDate() {
return enrolledDate;
}
public void setEnrolledDate(Date enrolledDate) {
this.enrolledDate = enrolledDate;
}
@Override
public String toString() {
return "Student [studentId=" + studentId + ", studentName=" + studentName + ", enrolledDate=" + enrolledDate
+ "]";
}
}
public class StudentMapper implements FieldSetMapper<Student> {
private SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy");
@Override
public Student mapFieldSet(FieldSet fieldSet) throws BindException {
Student student = new Student();
student.setStudentName(fieldSet.readString(0));
student.setEnrolledDate(new Date());
return student;
}
}
public class Reader implements ItemReader<FlatFileItemReader<Student>> {
@Override
public FlatFileItemReader<Student> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
// Data Sources
Resource inputCsv = new ClassPathResource("demo01/input/record.csv");
// Spring Batch's built-in reader
FlatFileItemReader<Student> reader = new FlatFileItemReader<Student>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
String[] tokens = {"studentname"};
tokenizer.setNames(tokens);
reader.setResource(inputCsv);
DefaultLineMapper<Student> lineMapper = new DefaultLineMapper<Student>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(new StudentMapper());
reader.setLineMapper(lineMapper);
return reader;
}
}
public class Processor implements ItemProcessor<Student, Student> {
@Override
public Student process(Student item) throws Exception {
System.out.println("Processing..." + item);
return item;
}
}
public class Writer implements ItemWriter<Student> {
@Autowired
private SessionFactory sessionFactory;
@Override
public void write(List<? extends Student> list) throws Exception {
HibernateItemWriter<Student> hibernateItemWriter = new HibernateItemWriter<Student>();
hibernateItemWriter.setSessionFactory(sessionFactory);
hibernateItemWriter.write(list);
hibernateItemWriter.afterPropertiesSet();
}
}
我将获得要处理的完整单个 CSV 文件,我不想拆分 CSV,因此我将使用相同的 Reader 过程来读取 CSV 记录。但是我想引入多个处理器和写入器来提高性能。类似于 AsyncItemProcessor
<bean id="student" class="com.mkyong.entity.Student" scope="prototype" />
<batch:job id="helloWorldJob">
<batch:step id="step1">
<batch:tasklet>
<batch:chunk
reader="cvsFileItemReader"
processor="itemProcessor"
writer="itemWriter"
commit-interval="10">
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
<bean id="cvsFileItemReader" class="com.mkyong.batch.Reader" />
<bean id="itemProcessor" class="com.mkyong.batch.Processor" />
<bean id="itemWriter" class="com.mkyong.batch.Writer" />
如何将其转换为支持 AsyncItemProcessor?
在进行异步或并行之前,您应该首先修复您共享的代码。 Reader
class 不正确。当前声明读取类型为 FlatFileItemReader
而不是 Student
的项目。 read
方法应该 return 类型 Student
的项目而不是 reader 本身。这个class不需要,你可以直接在你的步骤中使用Spring批处理提供的FlatFileItemReader
。
作者也一样,你可以直接在你的步骤中使用HibernateItemWriter
。
现在要回答有关使用异步处理器和编写器的问题,您需要将处理器包装在 AsyncItemProcessor
中,将编写器包装在 AsyncItemWriter
中。您可以在此处找到更多详细信息和代码示例:Asynchronous Processors.