重新启动失败的 Spring (Boot) 命令行启动的批处理作业
Restarting a failed Spring (Boot) Batch job lauched by command line
我知道有很多关于这个主题的问题和帖子(除了自己的 Spring 文档),但我承认我仍然没有弄清楚作业重新启动是如何工作的。
首先,我使用 Spring Boot 创建我的批处理程序。我的pom.xml相关部分如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
...
<dependencies>
<!-- Spring -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
<!-- Databases -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<!-- Misc -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>23.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
主要class注解@EnableBatchProcessing:
@SpringBootApplication
@EnableBatchProcessing
public class Migrator {
public static void main( String[] args ) {
SpringApplication.run( Migrator.class, args );
}
}
正确地说,作业配置如下:
@Configuration
public class EnigmaPartitionedJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
...
@Bean
public Job configureJob( @Qualifier( "constraintsTurnOffStep" ) Step constraintsTurnOffStep,
@Qualifier( "constraintsTurnOnStep" ) Step constraintsTurnOnStep,
@Qualifier( "partitionerStep" ) Step partitionerStep ) {
return jobBuilderFactory
.get( "migratorPartitionedJob" )
.incrementer( new RunIdIncrementer() )
.start( constraintsTurnOffStep )
.next( partitionerStep )
.next( constraintsTurnOnStep )
.build();
}
...
}
因此,该作业由三个步骤组成。第一个和最后一个非常简单。它们只是分别关闭和打开目标数据库中的一些约束。中间一步是核心业务。它读取描述源数据库的输入文件,并应用基于 table 的分区。也就是说,迁移步骤的副本是 运行 在每个 table 源数据库的单独线程上:
@Bean( "partitionerStep" )
public Step partitionerStep( @Qualifier( "migrationAndAnonymizationStep" ) Step migrationAndAnonymizationStep, TaskExecutor taskExecutor ) {
return stepBuilderFactory
.get( "partitionerStep" )
.partitioner( "migrationAndAnonymizationStep", new MultiTablePartitioner( this.model.getEntities() ) )
.step( migrationAndAnonymizationStep )
.taskExecutor( taskExecutor )
.gridSize( this.model.getEntitiesCount() )
.build();
}
最后配置迁移步骤如下:
@Bean( "migrationAndAnonymizationStep" )
public Step migrationAndAnonymizationStep( MigrationAndAnonymizationReader reader,
MigrationAndAnonymizationProcessor processor,
MigrationAndAnonymizationWriter writer,
TaskExecutor taskExecutor ) {
return stepBuilderFactory
.get( "migrationAndAnonymizationStep" )
.<Map<String, Object>, Map<String, Object>>chunk( 50 )
.reader( reader )
.processor( processor )
.writer( writer )
.taskExecutor( taskExecutor )
.throttleLimit( 1 )
.build();
}
MigrationAndAnonymizationReader
基本上是 JdbcCursorItemReader
由于分区而具有一些配置,而 MigrationAndAnonymizationWriter
基本上是 FlatFileItemWriter
的子 class 具有一些小的初始化,就像 MigrationAndAnonymizationReader
.
作业执行 运行 使用命令行:
$ java -jar migrator-0.0.1-SNAPSHOT.jar --spring.profiles.active=[list of profiles] --migrator.source-username=XXX --migrator.source-password=XXX --migrator.target-username=XXX --migrator.target-password=XXX --spring.datasource.username=XXX --spring.datasource.password=XXX --migrator.model-path=[path to a specific input file]
我检查重启功能的测试包括 运行 使用与前一个命令类似的命令来执行作业,并且在执行过程中(处理我正在使用的测试数据库通常需要 2 分钟) ,我杀死了这个过程。然后,我执行相同的命令行,我的期望是作业会重新启动,而不是执行已完成的步骤和未完成的步骤,从失败前的位置开始执行。然而,每次我尝试这个测试时,我看到的是作业完全从头开始执行。
那么,我在这里缺少什么? Spring Boot/Batch 希望我采取什么行动、实施或配置?
Spring 由于这一行,批处理开始了一个新的工作:
.incrementer( new RunIdIncrementer() )
这会为每个 运行 生成一个唯一的 ID。
如果你想重启一个作业,你必须确保你传递给作业的参数是相同的。所以你不能使用 RunIdIncrementer
我知道有很多关于这个主题的问题和帖子(除了自己的 Spring 文档),但我承认我仍然没有弄清楚作业重新启动是如何工作的。
首先,我使用 Spring Boot 创建我的批处理程序。我的pom.xml相关部分如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
...
<dependencies>
<!-- Spring -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
<!-- Databases -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<!-- Misc -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>23.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
主要class注解@EnableBatchProcessing:
@SpringBootApplication
@EnableBatchProcessing
public class Migrator {
public static void main( String[] args ) {
SpringApplication.run( Migrator.class, args );
}
}
正确地说,作业配置如下:
@Configuration
public class EnigmaPartitionedJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
...
@Bean
public Job configureJob( @Qualifier( "constraintsTurnOffStep" ) Step constraintsTurnOffStep,
@Qualifier( "constraintsTurnOnStep" ) Step constraintsTurnOnStep,
@Qualifier( "partitionerStep" ) Step partitionerStep ) {
return jobBuilderFactory
.get( "migratorPartitionedJob" )
.incrementer( new RunIdIncrementer() )
.start( constraintsTurnOffStep )
.next( partitionerStep )
.next( constraintsTurnOnStep )
.build();
}
...
}
因此,该作业由三个步骤组成。第一个和最后一个非常简单。它们只是分别关闭和打开目标数据库中的一些约束。中间一步是核心业务。它读取描述源数据库的输入文件,并应用基于 table 的分区。也就是说,迁移步骤的副本是 运行 在每个 table 源数据库的单独线程上:
@Bean( "partitionerStep" )
public Step partitionerStep( @Qualifier( "migrationAndAnonymizationStep" ) Step migrationAndAnonymizationStep, TaskExecutor taskExecutor ) {
return stepBuilderFactory
.get( "partitionerStep" )
.partitioner( "migrationAndAnonymizationStep", new MultiTablePartitioner( this.model.getEntities() ) )
.step( migrationAndAnonymizationStep )
.taskExecutor( taskExecutor )
.gridSize( this.model.getEntitiesCount() )
.build();
}
最后配置迁移步骤如下:
@Bean( "migrationAndAnonymizationStep" )
public Step migrationAndAnonymizationStep( MigrationAndAnonymizationReader reader,
MigrationAndAnonymizationProcessor processor,
MigrationAndAnonymizationWriter writer,
TaskExecutor taskExecutor ) {
return stepBuilderFactory
.get( "migrationAndAnonymizationStep" )
.<Map<String, Object>, Map<String, Object>>chunk( 50 )
.reader( reader )
.processor( processor )
.writer( writer )
.taskExecutor( taskExecutor )
.throttleLimit( 1 )
.build();
}
MigrationAndAnonymizationReader
基本上是 JdbcCursorItemReader
由于分区而具有一些配置,而 MigrationAndAnonymizationWriter
基本上是 FlatFileItemWriter
的子 class 具有一些小的初始化,就像 MigrationAndAnonymizationReader
.
作业执行 运行 使用命令行:
$ java -jar migrator-0.0.1-SNAPSHOT.jar --spring.profiles.active=[list of profiles] --migrator.source-username=XXX --migrator.source-password=XXX --migrator.target-username=XXX --migrator.target-password=XXX --spring.datasource.username=XXX --spring.datasource.password=XXX --migrator.model-path=[path to a specific input file]
我检查重启功能的测试包括 运行 使用与前一个命令类似的命令来执行作业,并且在执行过程中(处理我正在使用的测试数据库通常需要 2 分钟) ,我杀死了这个过程。然后,我执行相同的命令行,我的期望是作业会重新启动,而不是执行已完成的步骤和未完成的步骤,从失败前的位置开始执行。然而,每次我尝试这个测试时,我看到的是作业完全从头开始执行。
那么,我在这里缺少什么? Spring Boot/Batch 希望我采取什么行动、实施或配置?
Spring 由于这一行,批处理开始了一个新的工作:
.incrementer( new RunIdIncrementer() )
这会为每个 运行 生成一个唯一的 ID。
如果你想重启一个作业,你必须确保你传递给作业的参数是相同的。所以你不能使用 RunIdIncrementer