Spring 云数据流任务在执行之间保留参数

Spring Cloud Data Flow Task Persist Arguments Between Executions

我正在尝试使用 SCDF 并成功地 运行 Spring 将批处理作业作为任务。但是我遇到了任务参数持续存在的问题。似乎每次我需要执行任务时,我都应该为它提供命令行参数。

在我的用例中,我需要为任务一次性设置命令行参数。

谢谢

这是设计使然。每次启动任务应用程序时都必须传递任务应用程序的 arguments,因为命令行参数并不意味着在后续任务启动之间传播。

只有您在上面作为 Parameters 传递的任务 deployment 属性被设计为在您启动后续任务启动时保留和重新使用。这些部署属性还包括任务 application 属性(通过 app. 前缀传递的属性)以及特定于平台的 deployer 属性(带有 deployer. 的属性前缀).

鉴于这些设计方面,我同意可能会有用例(如您的)在任务启动之间传递相同的参数。因此,我建议您用您的具体案例 here 创建一个故事,我们将重新审视设计以确定范围。

经过一些研究,我最终使用了参数而不是参数。

  1. 首先我创建了一个包含多个 CommandLineRunners 的 Spring 批处理(在我的例子中是 2 个),其中一个用于生产,它将使用将被 SCDF 参数覆盖的 "application properties" ,还有一个用于其他环境(DEV,...),将通过简单的命令行参数或通过 API.

  2. 启动
  3. 第一个 CommandLineRunner:

    @组件 @Slf4j @Profile("prod") public class ProdJobCommandLineRunner 实现 CommandLineRunner {

    @Value("${jobname}")
    private String jobName;
    
    @Value("${argument1}")
    private String argument1;
    
    @Value("${argument2}")
    private String argument2;
    
    @Autowired
    private ApplicationContext context;
    
    @Autowired
    private JobLauncher jobLauncher;
    
    @Override
    public void run(String... args) {
    
        log.info("Begin Launching Job with Args {}", Arrays.asList(args));
        log.error("JOB NAME: " + jobName);
    
        if (!CollectionUtils.isEmpty(Arrays.asList(args))) {
    
            JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
    
            jobParametersBuilder.addString("argument1", argument1);
            jobParametersBuilder.addString("argument2", argument2);
    
            try {
                Job job = (Job) context.getBean(jobName);
    
                jobLauncher.run(job, jobParametersBuilder.toJobParameters());
    
            } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
                log.error("Exception ", e);
            }
        }
        log.info("End Launching Job with Args {}", Arrays.asList(args));
    }
    

    }

  4. 第二个 CommandLineRunner:

    @组件 @Slf4j @Profile("!prod") public class DefaultJobCommandLineRunner 实现 CommandLineRunner {

    @Autowired
    private ApplicationContext context;
    
    @Autowired
    private JobLauncher jobLauncher;
    
    @Override
    public void run(String... args) {
    
        log.info("Begin Launching Job with Args {}", Arrays.asList(args));
    
        if (!CollectionUtils.isEmpty(Arrays.asList(args))) {
    
            Map<String, String> params = parseJobArgs(args);
            JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
    
            if (Boolean.parseBoolean(params.getOrDefault("force_restart", "false"))) {
                jobParametersBuilder.addString("force_restart", LocalDateTime.now().toString());
            }
    
            try {
    
                String jobName = params.get("job_name");
    
                log.info("JOB NAME: " + jobName);
    
                Job job = (Job) context.getBean(jobName);
    
                jobLauncher.run(job, jobParametersBuilder.toJobParameters());
    
            } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
                log.error("Exception ", e);
            }
        }
        log.info("End Launching Job with Args {}", Arrays.asList(args));
    }
    
    private Map<String, String> parseJobArgs(String[] args) {
        Map<String, String> params = new HashMap<>();
    
        Arrays.asList(args).forEach(arg -> {
            String key = StringUtils.trimAllWhitespace(arg.split("=")[0]);
            String value = StringUtils.trimAllWhitespace(arg.split("=")[1]);
    
            params.put(key, value);
        });
        return params;
    }
    

    }

  5. 在 SCDF 中导入应用程序,例如 TESTAPP

  1. 使用相同的导入应用创建多个任务,具体取决于您有多少用例

  2. 对于首次启动的每个任务,请按照命名约定设置您拥有的参数: 应用程序。 "APP_NAME"。 "property key"="property value"

在这种情况下,例如它将是:app.TESTAPP.jobname=JOB_NAME

希望对您有所帮助。