从 SCDF 启动任务时如何解决异常 "java.lang.IllegalArgumentException: Invalid TaskExecution, ID 3"?

How to solve the execption "java.lang.IllegalArgumentException: Invalid TaskExecution, ID 3" when launching a task from SCDF?

我正在尝试通过 SCDF 运行 一个 Spring 批处理 jar。我在读取和写入时都使用不同的数据源 fpr(都是 Oracle DB)。我用来写的数据源是主数据源。我使用 Custom Build SCDF 来包含 oracle 驱动程序依赖项。下面是自定义 SCDF 项目位置。

dataflow-server-22x

我是本地 Spring 批处理项目 我实施了 DefaultTaskConfigurer 来提供我的主要数据源。因此,当我 运行 来自 IDE 的 Batch 项目时,项目 运行 很好,它从辅助数据源读取记录并写入主数据源。但是当我将批处理 jar 部署为自定义构建 SCDF 作为任务并启动它时,我收到一条错误消息,显示

org.springframework.context.ApplicationContextException: Failed to start bean 'taskLifecycleListener'; nested exception is java.lang.IllegalArgumentException: Invalid TaskExecution, ID 3 not found

当我检查任务执行 table(可以通过主数据源访问)时,任务执行 ID 在 table 中。但我仍然收到此错误。对于每个 运行,一个新的任务 ID 被插入到 Task_execution table 中,但是我收到了上面的错误消息,其中包含新插入的 task_execution id。 以下是项目详情:

Spring-boot-starter-parent : 2.2.5.RELEASE.
Spring-cloud-dataflow : 2.2.0.RELEASE.

我使用批处理作业 class 的实例从 Boot 的主要 class 加载我的所有 Batch_jobs,并且仅加载主要 class(启动所有作业)包含@EnableTask 注释。下面是我的 class 结构。

    @SpringBootApplication
    @EnableScheduling
    @EnableTask
    public class SpringBootMainApplication{
        @Autowired
        Job1Loader job1Loader;

        public static void main(String[] args) {
            SpringApplication.run(SpringBootMainApplication.class, args);
        }

        @Scheduled(cron = "0 */1 * * * ?")
        public void executeJob1Loader() throws Exception
        {
            JobParameters param = new JobParametersBuilder()
                                        .addString("JobID",         
                                     String.valueOf(System.currentTimeMillis()))
                                        .toJobParameters();
            jobLauncher.run(job1Loader.loadJob1(), param);
        }
    }

    //Job Config
    @Configuration
    @EnableBatchProcessing
    public class Job1Loader {
    @Bean
        public Job loadJob1()
        {
            return jobBuilderFactory().get("JOb1Loader")
                .incrementer(new RunIdIncrementer())
                .flow(step01())
                .end()
                .build();;//return job
    }

我在我的 Spring 工作项目中使用了两个不同的数据源,都是 oracle 数据源(不同的服务器)。我将其中一个标记为主要数据源并在我的 "DefaultTaskConfigurer" 自定义实现中使用了该数据源如下。

@Configuration
public class TaskConfig extends DefaultTaskConfigurer {
    @Autowired
    DatabaseConfig databaseConfig; 
    @Override
    public DataSource getTaskDataSource() {
        return databaseConfig.dataSource();//dataSource() returns the 
primary ds
    }
}

以下是我在 SCDF 自定义服务器和 Spring 批处理项目中使用的属性。

更新 - 1

**Spring batch Job :**
 spring.datasource.jdbc-url=jdbc:oracle:thin:@**MY_PRIMARY_DB**
 spring.datasource.username=db_user
 spring.datasource.password=db_pwd
 spring.datasource.driver-class-name=oracle.jdbc.OracleDriver

spring.datasource.jdbc-url=jdbc:oracle:thin:@**MY_SECONDARY_DB**
 spring.datasource.username=db_user
 spring.datasource.password=db_pwd
 spring.datasource.driver-class-name=oracle.jdbc.OracleDriver

**SCDF custom Server:**
 spring.datasource.url=jdbc:oracle:thin:@**MY_PRIMARY_DB**
 spring.datasource.username=db_user
 spring.datasource.password=db_pwd
 spring.datasource.driver-class-name=oracle.jdbc.OracleDriver

我的 Batch 应用程序使用两个数据库配置。一个读一个写。 因为来源和目的地不同。 由于 TASK_EXECUTION tables 是在 MY_PRIMARY_DB 数据库中创建的,因此我只传递主要数据库配置供 SCDF 读取和写入。因为读写发生在同一个DB中。

我尝试了这个问题的其他答案,但 none 奏效了。正如我之前所说,对此的任何意见都会有很大帮助。 谢谢。

我没有像上面那样覆盖 DefaultTaskConfigurer.getTaskDataSource() 方法,而是更改了 DefaultTaskConfigurer 实现,如下所示。我还不确定为什么覆盖方法 getTaskDataSource() 会导致问题。以下是对我有用的解决方案。

@Configuration
public class TaskConfig extends DefaultTaskConfigurer 
{

    Logger logger = LoggerFactory.getLogger(TaskConfig.class);

    Autowired
    public TaskConfig(@Qualifier("datasource1") DataSource dataSource) {
        super(dataSource); //"datasource1" is reference to the primary datasource.
    }
}