Spring Batch Partitions Join 覆盖 RowMapper 值因此得到单个数组而不是多个

Spring Batch Partitions Join overriding the RowMapper Values hence getting single array instead of multiple

我正在使用 Spring BatchPostgres 读取数据并将其写入 MongoDB。在我的例子中,Employee 有 3 种不同类型的电子邮件地址 1) 家庭住址 2) 办公地址 3) 来自员工电子邮件 Table 的社交地址。

因为我们在数据库中几乎有 10 lacs 名员工,因此使用 Custom PartitionsPostgres 中提取数据并使用 employee_email(稍后还有 employee_phone),这样在处理器中将有一个 Mongo POJO 的映射并保存到 MongoDB 中。

现在的问题是我需要将员工电子邮件记录作为数组嵌入到联系人中,但根据当前逻辑将其保存为单独的集合

我们如何解决这个问题?

select * from root.employees c
full outer join root.employee_email ce
on c.employee_id = ce.employee_id
order by c.employee_id limit 1000 offset 0;

现在当数据保存到数据库中时,只有电子邮件被保存并且其他 2 个似乎被覆盖。

我需要如何处理,看起来 EmployeeRowMapper 正在覆盖所有其他电子邮件地址。我将如何解决这个问题?

Employee.Job

@Configuration
public class EmployeeJob {
    private static Logger logger = LoggerFactory.getLogger(EmployeeJob.class);

    private static final Integer CHUNK_SIZE = 1000;

    @Autowired
    private DataSource dataSource;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public EmployeesPartitions EmployeesPartition() {
        return new EmployeesPartitions();
    }

    @Bean
    public EmployeesJobListener EmployeesJobListener() {
        return new EmployeesJobListener();
    }

    @Bean("readEmployeeJob")
    public Job readEmployeeJob() throws Exception {
        return jobBuilderFactory.get("readEmployeeJob")
                .incrementer(new RunIdIncrementer())
                .start(EmployeeStepOne())
                .listener(EmployeesJobListener())
                .build();
    }

    @Bean
    public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() {
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("fac-thrd-");
        taskExecutor.setConcurrencyLimit(Runtime.getRuntime().availableProcessors());
        taskExecutor.setThreadGroupName("Employees-Thread");
        taskExecutor.setDaemon(false);
        taskExecutor.setThreadPriority(5);
        return taskExecutor;
    }

    @Bean
    public Step EmployeeStepOne() throws Exception {            
        return stepBuilderFactory.get("EmployeeStepOne")
                .partitioner(slaveStep().getName(), EmployeesPartition())
                .step(slaveStep())
                .gridSize(10)
                .taskExecutor(simpleAsyncTaskExecutor())
                .build();
    }

    // slave step
    @Bean
    public Step slaveStep() throws Exception {
        return stepBuilderFactory.get("slaveStep")
                .<EmployeesDTO, EmployeesDTO>chunk(CHUNK_SIZE)
                .reader(EmployeeReader(null, null))
                .writer(EmployeeWriter())
                .build();
    }


    // Readers
    @Bean(destroyMethod = "")
    @StepScope
    public JdbcCursorItemReader<EmployeesDTO> EmployeeReader(
            @Value("#{stepExecutionContext['limit']}") Long limit,
            @Value("#{stepExecutionContext['offset']}") Long offset) throws Exception {

        String sql = "select * from root.Employees c "
                + "full outer join root.Employee_email ce "
                + "on c.Employee_id = ce.Employee_id "
                + "order by c.Employee_id limit " + limit +" offset "+ offset;
        logger.info("Employees SQL = {} ", sql);
        JdbcCursorItemReader<EmployeesDTO> reader = new JdbcCursorItemReader<>();
        reader.setDataSource(this.dataSource);
        reader.setSql(sql);
        reader.setRowMapper(new EmployeeRowMapper());
        reader.afterPropertiesSet();
        return reader;
    }

    // Processors
    @Bean
    public ItemProcessor<EmployeesDTO, EmployeesDTO> EmployeeProcessor() {
        return new EmployeesProcessor();
    }

    // Writers
    @Bean
    public ItemWriter<EmployeesDTO> EmployeeWriter() {
        return new EmployeeWriter();
    }
}

RowMapper.java

public class EmployeeRowMapper implements RowMapper<Employee> {

    @Override
    public Employee mapRow(ResultSet rs, int rowNum) throws SQLException {
        // EmployeeEmail email = new EmployeeEmail();
        ....
        ....
        ....
        ....
        ....
        List<EmployeeEmail> employeeEmails = new ArrayList<>();
        employeeEmails.add(email);

        Employee dto = Employee.builder()
                .businessTitle(rs.getString(""))
                ...........
                ...........
                ...........
                ...........
                ...........
                ...........
                ...........
                .employeeEmails(employeeEmails)
                .build();

        return dto;
    }
}

A​​ RowMapper 用于将 单个 数据库行映射到 POJO。因此,除非每行包含不同列中的所有电子邮件,例如 id,name,email1,email2,email3,否则您尝试执行的操作将不起作用。

如果对于每个项目,您有 3 行电子邮件,您只需要 return id,name 查询并使用额外的查询来获取电子邮件。这个附加查询可以在映射器本身或项目处理器中完成,如 driving query pattern.

中所述