Spring 批处理:聚合记录和写入计数
Spring Batch : Aggregating records and write count
平面文件中有一些数据。例如
EmpCode,Salary,EmpName,...
100,1000,...,...
200,2000,...,...
200,2000,...,...
100,1000,...,...
300,3000,...,...
400,4000,...,...
我们想根据 EmpCode 汇总工资并写入数据库
Emp_Code Emp_Salary Updated_Time Updated_User
100 2000 ... ...
200 4000 ... ...
300 3000 ... ...
400 4000 ... ...
我按照Spring批次写了类如下
ItemReader - to read the employee data into a Employee object
示例 EmployeeItemProcessor:
public class EmployeeProcessor implements ItemProcessor<Employee, Employee> {
@Override
public Employee process(Employee employee) throws Exception {
employee.setUpdatedTime(new Date());
employee.setUpdatedUser("someuser");
return employee;
}
EmployeeItemWriter:
@Repository
public class EmployeeItemWriter implements ItemWriter<Employee> {
@Autowired
private SessionFactory sf;
@Override
public void write(List<? extends Employee> employeeList) throws Exception {
List<Employee> aggEmployeeList = aggregateEmpData(employeeList);
//write to db using session factory
}
private List<Employee> aggregateEmpData(List<? extends Employee> employeeList){
Map<String, Employee> map = new HashMap<String, Employee>();
for(Employee e: employeeList){
String empCode = e.getEmpCode();
if(map.containsKey(empCode)){
//get employee salary and add up
}else{
map.put(empCode,Employee);
}
}
return new ArrayList<Employee>(map.values());
}
}
XML 配置
...
<batch:job id="employeeJob">
<batch:step id="step1">
<batch:tasklet>
<batch:chunk reader="employeeItemReader"
writer="employeeItemWriter" processor="employeeItemProcessor"
commit-interval="100">
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
...
它正在发挥作用并为我服务。不过,我有几个问题。
1) 当我查看日志时,它显示如下(commit-interval=100):
status=COMPLETED,exitStatus=COMPLETED,readCount=2652,filterCount=0,writeCount=2652 readSkipCount=0,writeSkipCount=0,processSkipCount=0,commitCount=27,回滚计数=0
但是聚合之后,只有2515条记录写入了数据库。 write count是2652,是不是因为到达ItemWriter的item数量还是2652?如何纠正?
2) 我们在 ItemProcessor 中遍历列表 twice.Once,然后在 ItemWriter 中进行聚合。如果记录数较多,则可能是性能问题。有没有更好的方法来实现这个目标?
为什么要在ItemWriter
中进行聚合?我会在 ItemProcessor
内完成。这将使写入计数准确,并将该组件与实际写入行为分开。如果您对您的配置有一些了解,我们可以详细说明。
如果输入文件的每一行都是一个员工对象,那么您的 ReadCount 将是输入文件中的行数。 WriteCount 将是传递给项目编写器的所有列表大小的总和。因此,也许您的 aggregateEmpData 函数将一些记录删除或聚合为一个,因此,您的数据库计数与 WriteCount 不同。
如果您想确保 WriteCount 正好是数据库中的记录数,您应该在处理器中进行聚合。
我写好了。我是这样做的。
public class EmployeeProcessor implements ItemProcessor<Employee, Employee> {
Map<String, Employee> map;
@Override
public Employee process(Employee employee) throws Exception {
employee.setUpdatedTime(new Date());
employee.setUpdatedUser("someuser");
String empCode = employee.getEmpCode();
if(map.containsKey(empCode)){
//get employee salary and add up
return null;
}
map.put(empCode,employee);
return employee;
}
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
map = new HashMap<String, Employee>();
}
写入计数现在正确显示。
平面文件中有一些数据。例如
EmpCode,Salary,EmpName,...
100,1000,...,...
200,2000,...,...
200,2000,...,...
100,1000,...,...
300,3000,...,...
400,4000,...,...
我们想根据 EmpCode 汇总工资并写入数据库
Emp_Code Emp_Salary Updated_Time Updated_User
100 2000 ... ...
200 4000 ... ...
300 3000 ... ...
400 4000 ... ...
我按照Spring批次写了类如下
ItemReader - to read the employee data into a Employee object
示例 EmployeeItemProcessor:
public class EmployeeProcessor implements ItemProcessor<Employee, Employee> {
@Override
public Employee process(Employee employee) throws Exception {
employee.setUpdatedTime(new Date());
employee.setUpdatedUser("someuser");
return employee;
}
EmployeeItemWriter:
@Repository
public class EmployeeItemWriter implements ItemWriter<Employee> {
@Autowired
private SessionFactory sf;
@Override
public void write(List<? extends Employee> employeeList) throws Exception {
List<Employee> aggEmployeeList = aggregateEmpData(employeeList);
//write to db using session factory
}
private List<Employee> aggregateEmpData(List<? extends Employee> employeeList){
Map<String, Employee> map = new HashMap<String, Employee>();
for(Employee e: employeeList){
String empCode = e.getEmpCode();
if(map.containsKey(empCode)){
//get employee salary and add up
}else{
map.put(empCode,Employee);
}
}
return new ArrayList<Employee>(map.values());
}
}
XML 配置
...
<batch:job id="employeeJob">
<batch:step id="step1">
<batch:tasklet>
<batch:chunk reader="employeeItemReader"
writer="employeeItemWriter" processor="employeeItemProcessor"
commit-interval="100">
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
...
它正在发挥作用并为我服务。不过,我有几个问题。
1) 当我查看日志时,它显示如下(commit-interval=100):
status=COMPLETED,exitStatus=COMPLETED,readCount=2652,filterCount=0,writeCount=2652 readSkipCount=0,writeSkipCount=0,processSkipCount=0,commitCount=27,回滚计数=0
但是聚合之后,只有2515条记录写入了数据库。 write count是2652,是不是因为到达ItemWriter的item数量还是2652?如何纠正?
2) 我们在 ItemProcessor 中遍历列表 twice.Once,然后在 ItemWriter 中进行聚合。如果记录数较多,则可能是性能问题。有没有更好的方法来实现这个目标?
为什么要在ItemWriter
中进行聚合?我会在 ItemProcessor
内完成。这将使写入计数准确,并将该组件与实际写入行为分开。如果您对您的配置有一些了解,我们可以详细说明。
如果输入文件的每一行都是一个员工对象,那么您的 ReadCount 将是输入文件中的行数。 WriteCount 将是传递给项目编写器的所有列表大小的总和。因此,也许您的 aggregateEmpData 函数将一些记录删除或聚合为一个,因此,您的数据库计数与 WriteCount 不同。 如果您想确保 WriteCount 正好是数据库中的记录数,您应该在处理器中进行聚合。
我写好了。我是这样做的。
public class EmployeeProcessor implements ItemProcessor<Employee, Employee> {
Map<String, Employee> map;
@Override
public Employee process(Employee employee) throws Exception {
employee.setUpdatedTime(new Date());
employee.setUpdatedUser("someuser");
String empCode = employee.getEmpCode();
if(map.containsKey(empCode)){
//get employee salary and add up
return null;
}
map.put(empCode,employee);
return employee;
}
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
map = new HashMap<String, Employee>();
}
写入计数现在正确显示。