由于 SpringBatc 中的块计数而导致记录丢失

Records missing due to Chunk count in SpringBatc

我们有一个批处理作业来加载数百万个具有多个地址的员工数据,当我使用块时无法加载几行。

For example if I use chunk 5 and we loose 6th record which is associate to 5th row employee(refer image)

请提出解决方案。这是Spring批号

    @Configuration
public class EmployeeJobMyBatis {


    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private EmployeeDataSourceConfig datasourceConfig;

    EmployeeRowMapper rowMapper = null;

    private static final Logger LOG = LogManager.getLogger(EmployeeJobMyBatis.class);

    @Bean
    @Qualifier("MyBatisJob")
    public Job mybatisJob() throws Exception {
        return this.jobBuilderFactory.get("MyBatisJob").incrementer(new RunIdIncrementer())
                .start(step()).build();
    }

    @Bean
    public Step step() throws SQLException, Exception {

        return this.stepBuilderFactory.get("EmployeeDataReadStep").<Employee, String>chunk(5)
                .reader(reader()).processor(processor()).writer(writer())
                .build();
    }

    @Bean
    public SqlSessionFactory sqlSessionFactory() throws Exception {
        PathMatchingResourcePatternResolver resourcePatternResolver = new PathMatchingResourcePatternResolver();
        SqlSessionFactoryBean ss = new SqlSessionFactoryBean();
        ss.setDataSource(datasourceConfig.getDataSource());
        ss.setMapperLocations(resourcePatternResolver.getResources("employee.xml"));
        org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
        configuration.setDefaultExecutorType(ExecutorType.BATCH);
        ss.setConfiguration(configuration);
        return ss.getObject();
    }

    @Bean
    public MyBatisCursorItemReader<Employee> reader() throws Exception {
        MyBatisCursorItemReader<Employee> reader = new MyBatisCursorItemReader<Employee>();
        reader.setSqlSessionFactory(sqlSessionFactory());
        reader.setQueryId("EmployeeData");
        return reader;

    }

    @Bean
    public processor processor() {
        return new DataProcessor();
    }

    @Bean
    public MultiResourceItemWriter<String> writer() {
        MultiResourceItemWriter<String> writer = new MultiResourceItemWriter<String>();
        writer.setResource(new FileSystemResource("C:/data/Employee.json"));
        writer.setItemCountLimitPerResource(2500000);
        FlatFileItemWriter<String> fileWriter = new FlatFileItemWriter<String>();
        fileWriter.setLineAggregator(new MyDelimitedLineAggregator());
        writer.setDelegate(fileWriter);
        return writer;
    }



}


 public class DataProcessor implements ItemProcessor<Employee, String> {


    private static final Gson gson = new GsonBuilder().create();

    @Override
    public String process(Employee employee) throws Exception {

        if (employee != null && employee.getId() == null)
            return null;
        else
            return (String) (gson.toJson(employee));
    }

}

public class MyDelimitedLineAggregator extends DelimitedLineAggregator<String> {

    String returnString = "";
    @Override
    public String aggregate(String jsonstr) {
        if(jsonstr != null)
            returnString = jsonstr;
        return returnString;
    }
}



public class Employee{

    String emplId;
    Addresses addressList;

    public String getEmplId() {
        return emplId;
    }

   public void setEmplId(Addresses value) {
        this.emplId = value;
   }


   public Addresses getAddressList() {
       return addressList;
   }

   public void setAddressList(Addresses value) {
       this.addressList = value;
   }
}
public class Addresses{

   List<Address> addresses;

   public List<Address> getAddresses() {
     if (addresses == null) {
        addresses = new ArrayList<Address>();
      }
     return this.addresses;
   }
 }
 public class Address{
   String addressLineOne;
   String city;
   String country;

   public String getAddressLineOne(){
       return addressLineOne;
   }

   public void setAddressLineOne(String value) {
       this.addressLineOne = value;
   }

   public String getCity(){
       return city;
   }

   public void setCity(String value) {
       this.city = value;
   }
   public String getCountry(){
       return country;
   }

   public void setCountry(String value) {
       this.country = value;
   }
}

这是MyBatis Mapper xml

Employee.xml

<resultMap id="EmployeeMap" type="Employee">
  <id column="emplId" property="emplId"/>
  <collection property="addressList.addresses"
      javaType="list" ofType="Address">
    <result column="addressLineOne" property="addressLineOne"/>
    <result column="city" property="city"/>
    <result column="country" property="country"/>
  </collection>
</resultMap>
<select id="employeeData" resultMap="EmployeeMap">select * from employee e left join address a on a.emplId = e.emplId</select>

面向块的步骤逐行读取并将每一行映射到域对象(基本上是一对一映射)。在你的情况下,你有一对多的关系。所以你的步骤配置不会像现在这样工作。您需要做的是按如下方式实现 driving query pattern

  • 使reader读取员工详细信息除了地址
  • 使用项目处理器获取当前员工的地址

编辑:添加示例

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;

@Configuration
@EnableBatchProcessing
public class MyJob {

    @Bean
    public DataSource dataSource() {
        return new EmbeddedDatabaseBuilder()
                .setType(EmbeddedDatabaseType.H2)
                .addScript("/org/springframework/batch/core/schema-drop-h2.sql")
                .addScript("/org/springframework/batch/core/schema-h2.sql")
                .build();
    }

    @Bean
    public JdbcTemplate jdbcTemplate(DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }

    @Bean
    public JdbcCursorItemReader<Person> itemReader() {
        return new JdbcCursorItemReaderBuilder<Person>()
                .name("personItemReader")
                .dataSource(dataSource())
                .sql("select id, name from person")
                .beanRowMapper(Person.class)
                .build();
    }

    @Bean
    public ItemProcessor<Person, Person> itemProcessor() {
        return new ItemProcessor<Person, Person>() {
            @Autowired
            private JdbcTemplate jdbcTemplate;

            @Override
            public Person process(Person person) {
                Address address = jdbcTemplate.queryForObject("select * from address where personId = ?", new Object[]{person.getId()}, new BeanPropertyRowMapper<>(Address.class));
                person.setAddress(address);
                return person;
            }
        };
    }

    @Bean
    public ItemWriter<Person> itemWriter() {
        return items -> {
            for (Person item : items) {
                System.out.println("item = " + item);
            }
        };
    }

    @Bean
    public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
        return jobs.get("job")
                .start(steps.get("step")
                        .<Person, Person>chunk(2)
                        .reader(itemReader())
                        .processor(itemProcessor())
                        .writer(itemWriter())
                        .build())
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
        JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class);
        jdbcTemplate.update("CREATE TABLE address (id INT IDENTITY NOT NULL PRIMARY KEY, personId INT, street VARCHAR(20));");
        jdbcTemplate.update("CREATE TABLE person (id INT IDENTITY NOT NULL PRIMARY KEY, name VARCHAR(20));");
        jdbcTemplate.update("INSERT INTO address (id, personId, street) VALUES (1,1, 'oxford street');");
        jdbcTemplate.update("INSERT INTO address (id, personId, street) VALUES (2,2, 'howard street');");
        jdbcTemplate.update("INSERT INTO person (id, name) VALUES (1, 'foo');");
        jdbcTemplate.update("INSERT INTO person (id, name) VALUES (2, 'bar');");
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

    public static class Person {
        private long id;
        private String name;
        private Address address;

        public Person() {
        }

        public Person(long id, String name) {
            this.id = id;
            this.name = name;
        }

        public long getId() {
            return id;
        }

        public void setId(long id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public Address getAddress() {
            return address;
        }

        public void setAddress(Address address) {
            this.address = address;
        }

        @Override
        public String toString() {
            return "Person{" +
                    "id=" + id +
                    ", name='" + name + '\'' +
                    ", address=" + address +
                    '}';
        }
    }

    public static class Address {
        private int id;
        private int personId;
        private String street;

        public Address() {
        }

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public String getStreet() {
            return street;
        }

        public void setStreet(String street) {
            this.street = street;
        }

        public int getPersonId() {
            return personId;
        }

        public void setPersonId(int personId) {
            this.personId = personId;
        }

        @Override
        public String toString() {
            return "Address{" +
                    "id=" + id +
                    ", street='" + street + '\'' +
                    '}';
        }
    }

}

此示例读取 person/address 数据。 reader 只读取该人的 idname,处理器获取当前项目的地址。