Spring 批处理 - 我的批处理似乎同时执行两个步骤?
Spring Batch - My Batch seems executing two steps at the same time?
我真的不明白这是怎么回事。我正在研究 Spring 批处理,出于某些原因我想执行两个步骤,一个接一个。
现在请不要介意当前正在执行的步骤,请记住我想按顺序执行两个步骤。
这是代码:
@Configuration
@EnableBatchProcessing
public class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
private List<Employee> employeesToSave = new ArrayList<Employee>();
public JsonItemReader<Employee> jsonReader() {
System.out.println("Try to read JSON");
final ObjectMapper mapper = new ObjectMapper();
final JacksonJsonObjectReader<Employee> jsonObjectReader = new JacksonJsonObjectReader<>(
Employee.class);
jsonObjectReader.setMapper(mapper);
return new JsonItemReaderBuilder<Employee>().jsonObjectReader(jsonObjectReader)
.resource(new ClassPathResource("input.json"))
.name("myReader")
.build();
}
public ListItemReader<Employee> listReader() {
System.out.println("Read from list");
return new ListItemReader<Employee>(employeesToSave);
*/
}
public ItemProcessor<Employee,Employee> filterProcessor() {
return employee -> {
System.out.println("Processing JSON");
return employee;
};
}
public ItemWriter<Employee> filterWriter() {
return listEmployee -> {
employeesToSave.addAll(listEmployee);
System.out.println("Save on list " + listEmployee.toString());
};
}
public ItemWriter<Employee> insertToDBWriter() {
System.out.println("Try to save on DB");
return listEmployee -> {
System.out.println("Save on DB " + listEmployee.toString());
};
}
public Step filterStep() {
StepBuilder stepBuilder = stepBuilderFactory.get("filterStep");
SimpleStepBuilder<Employee, Employee> simpleStepBuilder = stepBuilder.chunk(5);
return simpleStepBuilder.reader(jsonReader()).processor(filterProcessor()).writer(filterWriter()).build();
}
public Step insertToDBStep() {
StepBuilder stepBuilder = stepBuilderFactory.get("insertToDBStep");
SimpleStepBuilder<Employee, Employee> simpleStepBuilder = stepBuilder.chunk(5);
return simpleStepBuilder.reader(listReader()).writer(insertToDBWriter()).build();
}
@Bean
public Job myJob(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager) {
return jobBuilderFactory.get("myJob").incrementer(new RunIdIncrementer())
.start(filterStep())
.next(insertToDBStep())
.build();
}
}
为什么 insertToDBStep 不在 filterStep 的末尾开始,实际上看起来过滤器同时是 运行?以及为什么它看起来像在 Root WebApplicationContext 初始化之后开始工作?
这是输出。
2022-05-23 15:40:49.418 INFO 14008 --- [ restartedMain] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1024 ms
Try to read JSON
Read from list
Try to save on DB
2022-05-23 15:40:49.882 INFO 14008 --- [ restartedMain] o.s.b.d.a.OptionalLiveReloadServer : LiveReload server is running on port 35729
2022-05-23 15:40:49.917 INFO 14008 --- [ restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2022-05-23 15:40:49.926 INFO 14008 --- [ restartedMain] c.marco.firstbatch.TestBatchApplication : Started TestBatchApplication in 1.985 seconds (JVM running for 2.789)
2022-05-23 15:40:49.927 INFO 14008 --- [ restartedMain] o.s.b.a.b.JobLauncherApplicationRunner : Running default command line with: []
2022-05-23 15:40:49.928 WARN 14008 --- [ restartedMain] o.s.b.c.c.a.DefaultBatchConfigurer : No datasource was provided...using a Map based JobRepository
2022-05-23 15:40:49.928 WARN 14008 --- [ restartedMain] o.s.b.c.c.a.DefaultBatchConfigurer : No transaction manager was provided, using a ResourcelessTransactionManager
2022-05-23 15:40:49.943 INFO 14008 --- [ restartedMain] o.s.b.c.l.support.SimpleJobLauncher : No TaskExecutor has been set, defaulting to synchronous executor.
2022-05-23 15:40:49.972 INFO 14008 --- [ restartedMain] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=myJob]] launched with the following parameters: [{run.id=1}]
2022-05-23 15:40:50.003 INFO 14008 --- [ restartedMain] o.s.batch.core.job.SimpleStepHandler : Executing step: [filterStep]
Processing JSON
Processing JSON
Processing JSON
Processing JSON
Processing JSON
Save on list [com.marco.firstbatch.Employee@958d6e7, com.marco.firstbatch.Employee@464d17f8, com.marco.firstbatch.Employee@705520ac, com.marco.firstbatch.Employee@1a9f8e93, com.marco.firstbatch.Employee@55bf8cc9]
Processing JSON
Processing JSON
Save on list [com.marco.firstbatch.Employee@55d706c0, com.marco.firstbatch.Employee@1bc46dd4]
2022-05-23 15:40:50.074 INFO 14008 --- [ restartedMain] o.s.batch.core.step.AbstractStep : Step: [filterStep] executed in 70ms
2022-05-23 15:40:50.081 INFO 14008 --- [ restartedMain] o.s.batch.core.job.SimpleStepHandler : Executing step: [insertToDBStep]
2022-05-23 15:40:50.084 INFO 14008 --- [ restartedMain] o.s.batch.core.step.AbstractStep : Step: [insertToDBStep] executed in 3ms
2022-05-23 15:40:50.088 INFO 14008 --- [ restartedMain] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=myJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED] in 96ms
提前致谢。
步骤按顺序正确执行。您将 System.out.println
语句放在两个“种类”的地方:
- 在配置应用程序上下文时Spring框架执行的bean定义方法中
- 在运行你的工作
时由Spring批处理调用的批处理工件(项目处理器,项目编写器)的代码中
在您的情况下,Spring 框架将调用以下 bean 定义方法来定义第一步,filterStep()
:
jsonReader()
:打印 Try to read JSON
。此时没有读取文件,只定义了json reader bean。更准确的日志消息是:json reader bean created
.
listReader()
:打印 Read from list
。同样这里,文件读取还没有开始。更准确的日志消息是:list reader bean created
.
filterProcessor()
:什么都不打印。日志语句在 ItemProcessor#process
方法中。这将在运行时由 Spring 批处理调用,而不是在配置时间 的这个时间点调用
filterWriter()
:这里也是一样,print语句是在运行时调用的write方法中,而不是在配置时调用
这导致 filterStep()
的以下输出:
Try to read JSON
Read from list
现在 Spring 框架开始定义下一步,insertToDBStep()
。为此,它将根据您的步骤定义按顺序调用以下方法:
listReader()
:这个bean已经定义好了,Spring会重用同一个实例(默认情况下,Spring bean是单例)。因此,此方法没有输出。
insertToDBWriter()
:打印 Try to save on DB
。同样在这里,这里没有实际保存到数据库。更准确的日志消息应该是 insertToDBWriter bean created
(或者更准确的是 attempting to create insertToDBWriter bean
,以防后面的代码抛出异常)。
您现在的累计输出如下:
Try to read JSON
Read from list
Try to save on DB
此时,Spring Framework 已完成配置应用程序上下文的工作,Spring Batch 接管并开始工作。 filterStep()
的实际处理开始:
- reader(
ListItemReader
)在read
方法中没有任何输出。
- 处理器打印
Processing JSON
- 作者打印
Save on list ...
您似乎有两个块(第一个有 5 个项目,第二个有 2 个项目),这导致以下输出:
2022-05-23 15:40:50.003 INFO 14008 --- [ restartedMain] o.s.batch.core.job.SimpleStepHandler : Executing step: [filterStep]
Processing JSON
Processing JSON
Processing JSON
Processing JSON
Processing JSON
Save on list [com.marco.firstbatch.Employee@958d6e7, com.marco.firstbatch.Employee@464d17f8, com.marco.firstbatch.Employee@705520ac, com.marco.firstbatch.Employee@1a9f8e93, com.marco.firstbatch.Employee@55bf8cc9]
Processing JSON
Processing JSON
Save on list [com.marco.firstbatch.Employee@55d706c0, com.marco.firstbatch.Employee@1bc46dd4]
2022-05-23 15:40:50.074 INFO 14008 --- [ restartedMain] o.s.batch.core.step.AbstractStep : Step: [filterStep] executed in 70ms
然后,下一步开始执行,你会得到如下输出:
2022-05-23 15:40:50.081 INFO 14008 --- [ restartedMain] o.s.batch.core.job.SimpleStepHandler : Executing step: [insertToDBStep]
2022-05-23 15:40:50.084 INFO 14008 --- [ restartedMain] o.s.batch.core.step.AbstractStep : Step: [insertToDBStep] executed in 3ms
这里你可能会问为什么没有insertToDBWriter()
写的条目(即为什么没有Save on DB ..
日志)。这是因为 listReader()
是一个单例 bean,你在两个步骤中都使用它,所以当第二步调用它的 read
方法时,它仍然会 return null
,因为使用了相同的实例并且已经用完了步骤 1 中的项目列表。因此,由于没有要处理的项目,此步骤立即结束。如果你想在第二步中re-read列表中的项目,你可以用@StepScope
注释reader方法。这将为每个步骤创建一个不同的 reader 实例。
我真的不明白这是怎么回事。我正在研究 Spring 批处理,出于某些原因我想执行两个步骤,一个接一个。
现在请不要介意当前正在执行的步骤,请记住我想按顺序执行两个步骤。
这是代码:
@Configuration
@EnableBatchProcessing
public class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
private List<Employee> employeesToSave = new ArrayList<Employee>();
public JsonItemReader<Employee> jsonReader() {
System.out.println("Try to read JSON");
final ObjectMapper mapper = new ObjectMapper();
final JacksonJsonObjectReader<Employee> jsonObjectReader = new JacksonJsonObjectReader<>(
Employee.class);
jsonObjectReader.setMapper(mapper);
return new JsonItemReaderBuilder<Employee>().jsonObjectReader(jsonObjectReader)
.resource(new ClassPathResource("input.json"))
.name("myReader")
.build();
}
public ListItemReader<Employee> listReader() {
System.out.println("Read from list");
return new ListItemReader<Employee>(employeesToSave);
*/
}
public ItemProcessor<Employee,Employee> filterProcessor() {
return employee -> {
System.out.println("Processing JSON");
return employee;
};
}
public ItemWriter<Employee> filterWriter() {
return listEmployee -> {
employeesToSave.addAll(listEmployee);
System.out.println("Save on list " + listEmployee.toString());
};
}
public ItemWriter<Employee> insertToDBWriter() {
System.out.println("Try to save on DB");
return listEmployee -> {
System.out.println("Save on DB " + listEmployee.toString());
};
}
public Step filterStep() {
StepBuilder stepBuilder = stepBuilderFactory.get("filterStep");
SimpleStepBuilder<Employee, Employee> simpleStepBuilder = stepBuilder.chunk(5);
return simpleStepBuilder.reader(jsonReader()).processor(filterProcessor()).writer(filterWriter()).build();
}
public Step insertToDBStep() {
StepBuilder stepBuilder = stepBuilderFactory.get("insertToDBStep");
SimpleStepBuilder<Employee, Employee> simpleStepBuilder = stepBuilder.chunk(5);
return simpleStepBuilder.reader(listReader()).writer(insertToDBWriter()).build();
}
@Bean
public Job myJob(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager) {
return jobBuilderFactory.get("myJob").incrementer(new RunIdIncrementer())
.start(filterStep())
.next(insertToDBStep())
.build();
}
}
为什么 insertToDBStep 不在 filterStep 的末尾开始,实际上看起来过滤器同时是 运行?以及为什么它看起来像在 Root WebApplicationContext 初始化之后开始工作?
这是输出。
2022-05-23 15:40:49.418 INFO 14008 --- [ restartedMain] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1024 ms
Try to read JSON
Read from list
Try to save on DB
2022-05-23 15:40:49.882 INFO 14008 --- [ restartedMain] o.s.b.d.a.OptionalLiveReloadServer : LiveReload server is running on port 35729
2022-05-23 15:40:49.917 INFO 14008 --- [ restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2022-05-23 15:40:49.926 INFO 14008 --- [ restartedMain] c.marco.firstbatch.TestBatchApplication : Started TestBatchApplication in 1.985 seconds (JVM running for 2.789)
2022-05-23 15:40:49.927 INFO 14008 --- [ restartedMain] o.s.b.a.b.JobLauncherApplicationRunner : Running default command line with: []
2022-05-23 15:40:49.928 WARN 14008 --- [ restartedMain] o.s.b.c.c.a.DefaultBatchConfigurer : No datasource was provided...using a Map based JobRepository
2022-05-23 15:40:49.928 WARN 14008 --- [ restartedMain] o.s.b.c.c.a.DefaultBatchConfigurer : No transaction manager was provided, using a ResourcelessTransactionManager
2022-05-23 15:40:49.943 INFO 14008 --- [ restartedMain] o.s.b.c.l.support.SimpleJobLauncher : No TaskExecutor has been set, defaulting to synchronous executor.
2022-05-23 15:40:49.972 INFO 14008 --- [ restartedMain] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=myJob]] launched with the following parameters: [{run.id=1}]
2022-05-23 15:40:50.003 INFO 14008 --- [ restartedMain] o.s.batch.core.job.SimpleStepHandler : Executing step: [filterStep]
Processing JSON
Processing JSON
Processing JSON
Processing JSON
Processing JSON
Save on list [com.marco.firstbatch.Employee@958d6e7, com.marco.firstbatch.Employee@464d17f8, com.marco.firstbatch.Employee@705520ac, com.marco.firstbatch.Employee@1a9f8e93, com.marco.firstbatch.Employee@55bf8cc9]
Processing JSON
Processing JSON
Save on list [com.marco.firstbatch.Employee@55d706c0, com.marco.firstbatch.Employee@1bc46dd4]
2022-05-23 15:40:50.074 INFO 14008 --- [ restartedMain] o.s.batch.core.step.AbstractStep : Step: [filterStep] executed in 70ms
2022-05-23 15:40:50.081 INFO 14008 --- [ restartedMain] o.s.batch.core.job.SimpleStepHandler : Executing step: [insertToDBStep]
2022-05-23 15:40:50.084 INFO 14008 --- [ restartedMain] o.s.batch.core.step.AbstractStep : Step: [insertToDBStep] executed in 3ms
2022-05-23 15:40:50.088 INFO 14008 --- [ restartedMain] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=myJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED] in 96ms
提前致谢。
步骤按顺序正确执行。您将 System.out.println
语句放在两个“种类”的地方:
- 在配置应用程序上下文时Spring框架执行的bean定义方法中
- 在运行你的工作 时由Spring批处理调用的批处理工件(项目处理器,项目编写器)的代码中
在您的情况下,Spring 框架将调用以下 bean 定义方法来定义第一步,filterStep()
:
jsonReader()
:打印Try to read JSON
。此时没有读取文件,只定义了json reader bean。更准确的日志消息是:json reader bean created
.listReader()
:打印Read from list
。同样这里,文件读取还没有开始。更准确的日志消息是:list reader bean created
.filterProcessor()
:什么都不打印。日志语句在ItemProcessor#process
方法中。这将在运行时由 Spring 批处理调用,而不是在配置时间 的这个时间点调用
filterWriter()
:这里也是一样,print语句是在运行时调用的write方法中,而不是在配置时调用
这导致 filterStep()
的以下输出:
Try to read JSON
Read from list
现在 Spring 框架开始定义下一步,insertToDBStep()
。为此,它将根据您的步骤定义按顺序调用以下方法:
listReader()
:这个bean已经定义好了,Spring会重用同一个实例(默认情况下,Spring bean是单例)。因此,此方法没有输出。insertToDBWriter()
:打印Try to save on DB
。同样在这里,这里没有实际保存到数据库。更准确的日志消息应该是insertToDBWriter bean created
(或者更准确的是attempting to create insertToDBWriter bean
,以防后面的代码抛出异常)。
您现在的累计输出如下:
Try to read JSON
Read from list
Try to save on DB
此时,Spring Framework 已完成配置应用程序上下文的工作,Spring Batch 接管并开始工作。 filterStep()
的实际处理开始:
- reader(
ListItemReader
)在read
方法中没有任何输出。 - 处理器打印
Processing JSON
- 作者打印
Save on list ...
您似乎有两个块(第一个有 5 个项目,第二个有 2 个项目),这导致以下输出:
2022-05-23 15:40:50.003 INFO 14008 --- [ restartedMain] o.s.batch.core.job.SimpleStepHandler : Executing step: [filterStep]
Processing JSON
Processing JSON
Processing JSON
Processing JSON
Processing JSON
Save on list [com.marco.firstbatch.Employee@958d6e7, com.marco.firstbatch.Employee@464d17f8, com.marco.firstbatch.Employee@705520ac, com.marco.firstbatch.Employee@1a9f8e93, com.marco.firstbatch.Employee@55bf8cc9]
Processing JSON
Processing JSON
Save on list [com.marco.firstbatch.Employee@55d706c0, com.marco.firstbatch.Employee@1bc46dd4]
2022-05-23 15:40:50.074 INFO 14008 --- [ restartedMain] o.s.batch.core.step.AbstractStep : Step: [filterStep] executed in 70ms
然后,下一步开始执行,你会得到如下输出:
2022-05-23 15:40:50.081 INFO 14008 --- [ restartedMain] o.s.batch.core.job.SimpleStepHandler : Executing step: [insertToDBStep]
2022-05-23 15:40:50.084 INFO 14008 --- [ restartedMain] o.s.batch.core.step.AbstractStep : Step: [insertToDBStep] executed in 3ms
这里你可能会问为什么没有insertToDBWriter()
写的条目(即为什么没有Save on DB ..
日志)。这是因为 listReader()
是一个单例 bean,你在两个步骤中都使用它,所以当第二步调用它的 read
方法时,它仍然会 return null
,因为使用了相同的实例并且已经用完了步骤 1 中的项目列表。因此,由于没有要处理的项目,此步骤立即结束。如果你想在第二步中re-read列表中的项目,你可以用@StepScope
注释reader方法。这将为每个步骤创建一个不同的 reader 实例。