如何定期加载多个文件并从中断处继续?
How to load many files regularly and pick up where it left off?
一个目录下有很多文件通过Spring批量加载,定时加载。
我的 Spring Batch Jar 由外部调度程序启动。
如果文件中有错误,我希望加载其他文件,并在错误文件停止的地方继续加载(在更正我的文件后)
这是我的一个非常简单的例子:
SpringBatchSimpleTestApplication.java
@SpringBootApplication
public class SpringBatchSimpleTestApplication {
private static final Logger log = LoggerFactory.getLogger(SpringBatchSimpleTestApplication.class);
public static void main(String[] args) throws Exception {
System.exit(SpringApplication.exit(SpringApplication.run(SpringBatchSimpleTestApplication.class, args)));
}
}
PersonJobConfiguration.java
public class PersonJobConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Value("classpath*:person*.csv")
private Resource[] inputFiles;
@Bean
public MultiResourceItemReader<Person> multiResourceItemReader1() {
return new MultiResourceItemReaderBuilder<Person>()
.name("multiResourceItemReader1")
.delegate(reader1())
.resources(inputFiles)
.build();
}
@Bean
public FlatFileItemReader<Person> reader1() {
return new FlatFileItemReaderBuilder<Person>()
.name("personItemReader")
.lineTokenizer(new FixedLengthTokenizer() {{ setNames("firstName", "lastName", "age'"); setColumns(new Range(1,4), new Range(5,6),new Range(7)); }})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{ setTargetType(Person.class); }})
.build();
}
@Bean
public JdbcBatchItemWriter<Person> writer1(DataSource dataSource) {
JdbcBatchItemWriter<Person> jdbcBatchItemWriter = new JdbcBatchItemWriter<>();
jdbcBatchItemWriter.setAssertUpdates(true);
jdbcBatchItemWriter.setDataSource(dataSource);
jdbcBatchItemWriter.setSql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)");
jdbcBatchItemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
return jdbcBatchItemWriter;
}
@Bean
public Job person_job(JobEndListener listener, Step step1) {
return jobBuilderFactory.get("person_job")
//.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1)
.end()
.build();
}
@Bean
public Step step1(JdbcBatchItemWriter<Person> writer) {
return stepBuilderFactory.get("step1")
.<Person, Person> chunk(10)
.reader(multiResourceItemReader1())
.writer(writer)
.build();
}
}
Person.java
public class Person {
private String lastName;
private String firstName;
private String age;
public Person(String lastName, String firstName, String age) {
super();
this.lastName = lastName;
this.firstName = firstName;
this.age = age;
}
public String getAge() {
return age;
}
public void setAge(String age) {
this.age = age;
}
public Person() {
}
public Person(String firstName, String lastName) {
this.firstName = firstName;
this.lastName = lastName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getFirstName() {
return firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
@Override
public String toString() {
return "firstName: " + firstName + ", lastName: " + lastName;
}
}
我的问题:
如果我在作业中使用“.incrementer(new RunIdIncrementer())”,每次执行我的项目时都会创建一个新的作业。因此,如果加载失败,它会从文件的开头开始,而不是从停止的地方开始。
如果我不在作业中使用“.incrementer(new RunIdIncrementer())”,它会工作,它会从停止的地方开始,但是,如果我添加要加载的新文件,它们未加载,因为没有创建新作业。
如何加载同一目录中定期到达的一些文件?
在这种情况下使用“.incrementer(new RunIdIncrementer())”是最好的方法吗?如果没有,怎么办?
要从中断处继续加载,我不想输入每个作业 ID 以重新启动。我希望它自动完成。
我是否必须在数据库中获取失败的作业,然后重新启动它们?
如果可以,怎么办?
提前感谢您的帮助。
问候。
让事情变得复杂的是一个作业同时处理多个文件。
遵循 Unix 使一件事做一件事并将其做好的哲学,我建议将每个作业处理一个文件。您可以创建一个计划的方法来读取目录内容并为每个文件启动一个作业(唯一的文件名是一个识别作业参数)。成功加载的文件可以移动到另一个目录。现在如果一个作业失败,它不会影响其他文件的加载,并且在接下来的运行中,失败的文件将从它停止的地方重新启动。
这种方法有很多优点:
- 可扩展性:它允许您 运行 并行执行多个作业(MultiResource reader 方法将串行加载文件)
- Fault-tolerance:它允许您从中断处重新启动失败的文件
- 它不需要增量器。
- 等等
作业参数的选择对于正确设计可重启的 Spring 批处理作业至关重要。
一个目录下有很多文件通过Spring批量加载,定时加载。 我的 Spring Batch Jar 由外部调度程序启动。 如果文件中有错误,我希望加载其他文件,并在错误文件停止的地方继续加载(在更正我的文件后)
这是我的一个非常简单的例子:
SpringBatchSimpleTestApplication.java
@SpringBootApplication
public class SpringBatchSimpleTestApplication {
private static final Logger log = LoggerFactory.getLogger(SpringBatchSimpleTestApplication.class);
public static void main(String[] args) throws Exception {
System.exit(SpringApplication.exit(SpringApplication.run(SpringBatchSimpleTestApplication.class, args)));
}
}
PersonJobConfiguration.java
public class PersonJobConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Value("classpath*:person*.csv")
private Resource[] inputFiles;
@Bean
public MultiResourceItemReader<Person> multiResourceItemReader1() {
return new MultiResourceItemReaderBuilder<Person>()
.name("multiResourceItemReader1")
.delegate(reader1())
.resources(inputFiles)
.build();
}
@Bean
public FlatFileItemReader<Person> reader1() {
return new FlatFileItemReaderBuilder<Person>()
.name("personItemReader")
.lineTokenizer(new FixedLengthTokenizer() {{ setNames("firstName", "lastName", "age'"); setColumns(new Range(1,4), new Range(5,6),new Range(7)); }})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{ setTargetType(Person.class); }})
.build();
}
@Bean
public JdbcBatchItemWriter<Person> writer1(DataSource dataSource) {
JdbcBatchItemWriter<Person> jdbcBatchItemWriter = new JdbcBatchItemWriter<>();
jdbcBatchItemWriter.setAssertUpdates(true);
jdbcBatchItemWriter.setDataSource(dataSource);
jdbcBatchItemWriter.setSql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)");
jdbcBatchItemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
return jdbcBatchItemWriter;
}
@Bean
public Job person_job(JobEndListener listener, Step step1) {
return jobBuilderFactory.get("person_job")
//.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1)
.end()
.build();
}
@Bean
public Step step1(JdbcBatchItemWriter<Person> writer) {
return stepBuilderFactory.get("step1")
.<Person, Person> chunk(10)
.reader(multiResourceItemReader1())
.writer(writer)
.build();
}
}
Person.java
public class Person {
private String lastName;
private String firstName;
private String age;
public Person(String lastName, String firstName, String age) {
super();
this.lastName = lastName;
this.firstName = firstName;
this.age = age;
}
public String getAge() {
return age;
}
public void setAge(String age) {
this.age = age;
}
public Person() {
}
public Person(String firstName, String lastName) {
this.firstName = firstName;
this.lastName = lastName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getFirstName() {
return firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
@Override
public String toString() {
return "firstName: " + firstName + ", lastName: " + lastName;
}
}
我的问题:
如果我在作业中使用“.incrementer(new RunIdIncrementer())”,每次执行我的项目时都会创建一个新的作业。因此,如果加载失败,它会从文件的开头开始,而不是从停止的地方开始。
如果我不在作业中使用“.incrementer(new RunIdIncrementer())”,它会工作,它会从停止的地方开始,但是,如果我添加要加载的新文件,它们未加载,因为没有创建新作业。
如何加载同一目录中定期到达的一些文件? 在这种情况下使用“.incrementer(new RunIdIncrementer())”是最好的方法吗?如果没有,怎么办?
要从中断处继续加载,我不想输入每个作业 ID 以重新启动。我希望它自动完成。 我是否必须在数据库中获取失败的作业,然后重新启动它们? 如果可以,怎么办?
提前感谢您的帮助。 问候。
让事情变得复杂的是一个作业同时处理多个文件。
遵循 Unix 使一件事做一件事并将其做好的哲学,我建议将每个作业处理一个文件。您可以创建一个计划的方法来读取目录内容并为每个文件启动一个作业(唯一的文件名是一个识别作业参数)。成功加载的文件可以移动到另一个目录。现在如果一个作业失败,它不会影响其他文件的加载,并且在接下来的运行中,失败的文件将从它停止的地方重新启动。
这种方法有很多优点:
- 可扩展性:它允许您 运行 并行执行多个作业(MultiResource reader 方法将串行加载文件)
- Fault-tolerance:它允许您从中断处重新启动失败的文件
- 它不需要增量器。
- 等等
作业参数的选择对于正确设计可重启的 Spring 批处理作业至关重要。