Spring Batch Partitions Join 覆盖 RowMapper 值因此得到单个数组而不是多个
Spring Batch Partitions Join overriding the RowMapper Values hence getting single array instead of multiple
我正在使用 Spring Batch 从 Postgres
读取数据并将其写入 MongoDB
。在我的例子中,Employee
有 3 种不同类型的电子邮件地址 1) 家庭住址 2) 办公地址 3) 来自员工电子邮件 Table 的社交地址。
因为我们在数据库中几乎有 10 lacs
名员工,因此使用 Custom Partitions 从 Postgres
中提取数据并使用 employee_email(稍后还有 employee_phone),这样在处理器中将有一个 Mongo POJO 的映射并保存到 MongoDB 中。
现在的问题是我需要将员工电子邮件记录作为数组嵌入到联系人中,但根据当前逻辑将其保存为单独的集合
我们如何解决这个问题?
select * from root.employees c
full outer join root.employee_email ce
on c.employee_id = ce.employee_id
order by c.employee_id limit 1000 offset 0;
现在当数据保存到数据库中时,只有电子邮件被保存并且其他 2 个似乎被覆盖。
我需要如何处理,看起来 EmployeeRowMapper 正在覆盖所有其他电子邮件地址。我将如何解决这个问题?
Employee.Job
@Configuration
public class EmployeeJob {
private static Logger logger = LoggerFactory.getLogger(EmployeeJob.class);
private static final Integer CHUNK_SIZE = 1000;
@Autowired
private DataSource dataSource;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public EmployeesPartitions EmployeesPartition() {
return new EmployeesPartitions();
}
@Bean
public EmployeesJobListener EmployeesJobListener() {
return new EmployeesJobListener();
}
@Bean("readEmployeeJob")
public Job readEmployeeJob() throws Exception {
return jobBuilderFactory.get("readEmployeeJob")
.incrementer(new RunIdIncrementer())
.start(EmployeeStepOne())
.listener(EmployeesJobListener())
.build();
}
@Bean
public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("fac-thrd-");
taskExecutor.setConcurrencyLimit(Runtime.getRuntime().availableProcessors());
taskExecutor.setThreadGroupName("Employees-Thread");
taskExecutor.setDaemon(false);
taskExecutor.setThreadPriority(5);
return taskExecutor;
}
@Bean
public Step EmployeeStepOne() throws Exception {
return stepBuilderFactory.get("EmployeeStepOne")
.partitioner(slaveStep().getName(), EmployeesPartition())
.step(slaveStep())
.gridSize(10)
.taskExecutor(simpleAsyncTaskExecutor())
.build();
}
// slave step
@Bean
public Step slaveStep() throws Exception {
return stepBuilderFactory.get("slaveStep")
.<EmployeesDTO, EmployeesDTO>chunk(CHUNK_SIZE)
.reader(EmployeeReader(null, null))
.writer(EmployeeWriter())
.build();
}
// Readers
@Bean(destroyMethod = "")
@StepScope
public JdbcCursorItemReader<EmployeesDTO> EmployeeReader(
@Value("#{stepExecutionContext['limit']}") Long limit,
@Value("#{stepExecutionContext['offset']}") Long offset) throws Exception {
String sql = "select * from root.Employees c "
+ "full outer join root.Employee_email ce "
+ "on c.Employee_id = ce.Employee_id "
+ "order by c.Employee_id limit " + limit +" offset "+ offset;
logger.info("Employees SQL = {} ", sql);
JdbcCursorItemReader<EmployeesDTO> reader = new JdbcCursorItemReader<>();
reader.setDataSource(this.dataSource);
reader.setSql(sql);
reader.setRowMapper(new EmployeeRowMapper());
reader.afterPropertiesSet();
return reader;
}
// Processors
@Bean
public ItemProcessor<EmployeesDTO, EmployeesDTO> EmployeeProcessor() {
return new EmployeesProcessor();
}
// Writers
@Bean
public ItemWriter<EmployeesDTO> EmployeeWriter() {
return new EmployeeWriter();
}
}
RowMapper.java
public class EmployeeRowMapper implements RowMapper<Employee> {
@Override
public Employee mapRow(ResultSet rs, int rowNum) throws SQLException {
// EmployeeEmail email = new EmployeeEmail();
....
....
....
....
....
List<EmployeeEmail> employeeEmails = new ArrayList<>();
employeeEmails.add(email);
Employee dto = Employee.builder()
.businessTitle(rs.getString(""))
...........
...........
...........
...........
...........
...........
...........
.employeeEmails(employeeEmails)
.build();
return dto;
}
}
A RowMapper
用于将 单个 数据库行映射到 POJO。因此,除非每行包含不同列中的所有电子邮件,例如 id,name,email1,email2,email3
,否则您尝试执行的操作将不起作用。
如果对于每个项目,您有 3 行电子邮件,您只需要 return id,name
查询并使用额外的查询来获取电子邮件。这个附加查询可以在映射器本身或项目处理器中完成,如 driving query pattern.
中所述
我正在使用 Spring Batch 从 Postgres
读取数据并将其写入 MongoDB
。在我的例子中,Employee
有 3 种不同类型的电子邮件地址 1) 家庭住址 2) 办公地址 3) 来自员工电子邮件 Table 的社交地址。
因为我们在数据库中几乎有 10 lacs
名员工,因此使用 Custom Partitions 从 Postgres
中提取数据并使用 employee_email(稍后还有 employee_phone),这样在处理器中将有一个 Mongo POJO 的映射并保存到 MongoDB 中。
现在的问题是我需要将员工电子邮件记录作为数组嵌入到联系人中,但根据当前逻辑将其保存为单独的集合
我们如何解决这个问题?
select * from root.employees c
full outer join root.employee_email ce
on c.employee_id = ce.employee_id
order by c.employee_id limit 1000 offset 0;
现在当数据保存到数据库中时,只有电子邮件被保存并且其他 2 个似乎被覆盖。
我需要如何处理,看起来 EmployeeRowMapper 正在覆盖所有其他电子邮件地址。我将如何解决这个问题?
Employee.Job
@Configuration
public class EmployeeJob {
private static Logger logger = LoggerFactory.getLogger(EmployeeJob.class);
private static final Integer CHUNK_SIZE = 1000;
@Autowired
private DataSource dataSource;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public EmployeesPartitions EmployeesPartition() {
return new EmployeesPartitions();
}
@Bean
public EmployeesJobListener EmployeesJobListener() {
return new EmployeesJobListener();
}
@Bean("readEmployeeJob")
public Job readEmployeeJob() throws Exception {
return jobBuilderFactory.get("readEmployeeJob")
.incrementer(new RunIdIncrementer())
.start(EmployeeStepOne())
.listener(EmployeesJobListener())
.build();
}
@Bean
public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("fac-thrd-");
taskExecutor.setConcurrencyLimit(Runtime.getRuntime().availableProcessors());
taskExecutor.setThreadGroupName("Employees-Thread");
taskExecutor.setDaemon(false);
taskExecutor.setThreadPriority(5);
return taskExecutor;
}
@Bean
public Step EmployeeStepOne() throws Exception {
return stepBuilderFactory.get("EmployeeStepOne")
.partitioner(slaveStep().getName(), EmployeesPartition())
.step(slaveStep())
.gridSize(10)
.taskExecutor(simpleAsyncTaskExecutor())
.build();
}
// slave step
@Bean
public Step slaveStep() throws Exception {
return stepBuilderFactory.get("slaveStep")
.<EmployeesDTO, EmployeesDTO>chunk(CHUNK_SIZE)
.reader(EmployeeReader(null, null))
.writer(EmployeeWriter())
.build();
}
// Readers
@Bean(destroyMethod = "")
@StepScope
public JdbcCursorItemReader<EmployeesDTO> EmployeeReader(
@Value("#{stepExecutionContext['limit']}") Long limit,
@Value("#{stepExecutionContext['offset']}") Long offset) throws Exception {
String sql = "select * from root.Employees c "
+ "full outer join root.Employee_email ce "
+ "on c.Employee_id = ce.Employee_id "
+ "order by c.Employee_id limit " + limit +" offset "+ offset;
logger.info("Employees SQL = {} ", sql);
JdbcCursorItemReader<EmployeesDTO> reader = new JdbcCursorItemReader<>();
reader.setDataSource(this.dataSource);
reader.setSql(sql);
reader.setRowMapper(new EmployeeRowMapper());
reader.afterPropertiesSet();
return reader;
}
// Processors
@Bean
public ItemProcessor<EmployeesDTO, EmployeesDTO> EmployeeProcessor() {
return new EmployeesProcessor();
}
// Writers
@Bean
public ItemWriter<EmployeesDTO> EmployeeWriter() {
return new EmployeeWriter();
}
}
RowMapper.java
public class EmployeeRowMapper implements RowMapper<Employee> {
@Override
public Employee mapRow(ResultSet rs, int rowNum) throws SQLException {
// EmployeeEmail email = new EmployeeEmail();
....
....
....
....
....
List<EmployeeEmail> employeeEmails = new ArrayList<>();
employeeEmails.add(email);
Employee dto = Employee.builder()
.businessTitle(rs.getString(""))
...........
...........
...........
...........
...........
...........
...........
.employeeEmails(employeeEmails)
.build();
return dto;
}
}
A RowMapper
用于将 单个 数据库行映射到 POJO。因此,除非每行包含不同列中的所有电子邮件,例如 id,name,email1,email2,email3
,否则您尝试执行的操作将不起作用。
如果对于每个项目,您有 3 行电子邮件,您只需要 return id,name
查询并使用额外的查询来获取电子邮件。这个附加查询可以在映射器本身或项目处理器中完成,如 driving query pattern.