重新启动失败的 Spring (Boot) 命令行启动的批处理作业

Restarting a failed Spring (Boot) Batch job lauched by command line

我知道有很多关于这个主题的问题和帖子(除了自己的 Spring 文档),但我承认我仍然没有弄清楚作业重新启动是如何工作的。

首先,我使用 Spring Boot 创建我的批处理程序。我的pom.xml相关部分如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>

    ...

    <dependencies>
        <!-- Spring -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- Databases -->
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
        </dependency>

        <!-- Misc -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>23.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

主要class注解@EnableBatchProcessing:

@SpringBootApplication
@EnableBatchProcessing
public class Migrator {

    public static void main( String[] args ) {
        SpringApplication.run( Migrator.class, args );
    }

}

正确地说,作业配置如下:


@Configuration
public class EnigmaPartitionedJobConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    ...

    @Bean
    public Job configureJob( @Qualifier( "constraintsTurnOffStep" ) Step constraintsTurnOffStep, 
                             @Qualifier( "constraintsTurnOnStep" ) Step constraintsTurnOnStep,
                             @Qualifier( "partitionerStep" ) Step partitionerStep ) {
        return jobBuilderFactory
                        .get( "migratorPartitionedJob" )
                        .incrementer( new RunIdIncrementer() )
                        .start( constraintsTurnOffStep )
                        .next( partitionerStep )
                        .next( constraintsTurnOnStep )
                        .build();
    }

    ...

}

因此,该作业由三个步骤组成。第一个和最后一个非常简单。它们只是分别关闭和打开目标数据库中的一些约束。中间一步是核心业务。它读取描述源数据库的输入文件,并应用基于 table 的分区。也就是说,迁移步骤的副本是 运行 在每个 table 源数据库的单独线程上:

    @Bean( "partitionerStep" )
    public Step partitionerStep( @Qualifier( "migrationAndAnonymizationStep" ) Step migrationAndAnonymizationStep, TaskExecutor taskExecutor ) {
        return stepBuilderFactory
                    .get( "partitionerStep" )
                    .partitioner( "migrationAndAnonymizationStep", new MultiTablePartitioner( this.model.getEntities() ) )
                    .step( migrationAndAnonymizationStep )
                    .taskExecutor( taskExecutor )
                    .gridSize( this.model.getEntitiesCount() )
                    .build();
    }

最后配置迁移步骤如下:

    @Bean( "migrationAndAnonymizationStep" )
    public Step migrationAndAnonymizationStep( MigrationAndAnonymizationReader reader, 
                                               MigrationAndAnonymizationProcessor processor, 
                                               MigrationAndAnonymizationWriter writer,  
                                               TaskExecutor taskExecutor ) {

        return stepBuilderFactory
                    .get( "migrationAndAnonymizationStep" )
                    .<Map<String, Object>, Map<String, Object>>chunk( 50 )
                    .reader( reader )
                    .processor( processor )
                    .writer( writer )
                    .taskExecutor( taskExecutor )
                    .throttleLimit( 1 )
                    .build();
    }

MigrationAndAnonymizationReader 基本上是 JdbcCursorItemReader 由于分区而具有一些配置,而 MigrationAndAnonymizationWriter 基本上是 FlatFileItemWriter 的子 class 具有一些小的初始化,就像 MigrationAndAnonymizationReader.

作业执行 运行 使用命令行:

$ java -jar migrator-0.0.1-SNAPSHOT.jar --spring.profiles.active=[list of profiles] --migrator.source-username=XXX --migrator.source-password=XXX --migrator.target-username=XXX --migrator.target-password=XXX --spring.datasource.username=XXX --spring.datasource.password=XXX --migrator.model-path=[path to a specific input file]

我检查重启功能的测试包括 运行 使用与前一个命令类似的命令来执行作业,并且在执行过程中(处理我正在使用的测试数据库通常需要 2 分钟) ,我杀死了这个过程。然后,我执行相同的命令行,我的期望是作业会重新启动,而不是执行已完成的步骤和未完成的步骤,从失败前的位置开始执行。然而,每次我尝试这个测试时,我看到的是作业完全从头开始执行。

那么,我在这里缺少什么? Spring Boot/Batch 希望我采取什么行动、实施或配置?

Spring 由于这一行,批处理开始了一个新的工作:

.incrementer( new RunIdIncrementer() )

这会为每个 运行 生成一个唯一的 ID。

如果你想重启一个作业,你必须确保你传递给作业的参数是相同的。所以你不能使用 RunIdIncrementer