Spring 云数据流任务在执行之间保留参数
Spring Cloud Data Flow Task Persist Arguments Between Executions
我正在尝试使用 SCDF 并成功地 运行 Spring 将批处理作业作为任务。但是我遇到了任务参数持续存在的问题。似乎每次我需要执行任务时,我都应该为它提供命令行参数。
在我的用例中,我需要为任务一次性设置命令行参数。
谢谢
这是设计使然。每次启动任务应用程序时都必须传递任务应用程序的 arguments
,因为命令行参数并不意味着在后续任务启动之间传播。
只有您在上面作为 Parameters
传递的任务 deployment
属性被设计为在您启动后续任务启动时保留和重新使用。这些部署属性还包括任务 application
属性(通过 app.
前缀传递的属性)以及特定于平台的 deployer
属性(带有 deployer.
的属性前缀).
鉴于这些设计方面,我同意可能会有用例(如您的)在任务启动之间传递相同的参数。因此,我建议您用您的具体案例 here 创建一个故事,我们将重新审视设计以确定范围。
经过一些研究,我最终使用了参数而不是参数。
首先我创建了一个包含多个 CommandLineRunners 的 Spring 批处理(在我的例子中是 2 个),其中一个用于生产,它将使用将被 SCDF 参数覆盖的 "application properties" ,还有一个用于其他环境(DEV,...),将通过简单的命令行参数或通过 API.
启动
第一个 CommandLineRunner:
@组件
@Slf4j
@Profile("prod")
public class ProdJobCommandLineRunner 实现 CommandLineRunner {
@Value("${jobname}")
private String jobName;
@Value("${argument1}")
private String argument1;
@Value("${argument2}")
private String argument2;
@Autowired
private ApplicationContext context;
@Autowired
private JobLauncher jobLauncher;
@Override
public void run(String... args) {
log.info("Begin Launching Job with Args {}", Arrays.asList(args));
log.error("JOB NAME: " + jobName);
if (!CollectionUtils.isEmpty(Arrays.asList(args))) {
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addString("argument1", argument1);
jobParametersBuilder.addString("argument2", argument2);
try {
Job job = (Job) context.getBean(jobName);
jobLauncher.run(job, jobParametersBuilder.toJobParameters());
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
log.error("Exception ", e);
}
}
log.info("End Launching Job with Args {}", Arrays.asList(args));
}
}
第二个 CommandLineRunner:
@组件
@Slf4j
@Profile("!prod")
public class DefaultJobCommandLineRunner 实现 CommandLineRunner {
@Autowired
private ApplicationContext context;
@Autowired
private JobLauncher jobLauncher;
@Override
public void run(String... args) {
log.info("Begin Launching Job with Args {}", Arrays.asList(args));
if (!CollectionUtils.isEmpty(Arrays.asList(args))) {
Map<String, String> params = parseJobArgs(args);
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
if (Boolean.parseBoolean(params.getOrDefault("force_restart", "false"))) {
jobParametersBuilder.addString("force_restart", LocalDateTime.now().toString());
}
try {
String jobName = params.get("job_name");
log.info("JOB NAME: " + jobName);
Job job = (Job) context.getBean(jobName);
jobLauncher.run(job, jobParametersBuilder.toJobParameters());
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
log.error("Exception ", e);
}
}
log.info("End Launching Job with Args {}", Arrays.asList(args));
}
private Map<String, String> parseJobArgs(String[] args) {
Map<String, String> params = new HashMap<>();
Arrays.asList(args).forEach(arg -> {
String key = StringUtils.trimAllWhitespace(arg.split("=")[0]);
String value = StringUtils.trimAllWhitespace(arg.split("=")[1]);
params.put(key, value);
});
return params;
}
}
在 SCDF 中导入应用程序,例如 TESTAPP
使用相同的导入应用创建多个任务,具体取决于您有多少用例
对于首次启动的每个任务,请按照命名约定设置您拥有的参数:
应用程序。 "APP_NAME"。 "property key"="property value"
在这种情况下,例如它将是:app.TESTAPP.jobname=JOB_NAME
希望对您有所帮助。
我正在尝试使用 SCDF 并成功地 运行 Spring 将批处理作业作为任务。但是我遇到了任务参数持续存在的问题。似乎每次我需要执行任务时,我都应该为它提供命令行参数。
在我的用例中,我需要为任务一次性设置命令行参数。
谢谢
这是设计使然。每次启动任务应用程序时都必须传递任务应用程序的 arguments
,因为命令行参数并不意味着在后续任务启动之间传播。
只有您在上面作为 Parameters
传递的任务 deployment
属性被设计为在您启动后续任务启动时保留和重新使用。这些部署属性还包括任务 application
属性(通过 app.
前缀传递的属性)以及特定于平台的 deployer
属性(带有 deployer.
的属性前缀).
鉴于这些设计方面,我同意可能会有用例(如您的)在任务启动之间传递相同的参数。因此,我建议您用您的具体案例 here 创建一个故事,我们将重新审视设计以确定范围。
经过一些研究,我最终使用了参数而不是参数。
首先我创建了一个包含多个 CommandLineRunners 的 Spring 批处理(在我的例子中是 2 个),其中一个用于生产,它将使用将被 SCDF 参数覆盖的 "application properties" ,还有一个用于其他环境(DEV,...),将通过简单的命令行参数或通过 API.
启动
第一个 CommandLineRunner:
@组件 @Slf4j @Profile("prod") public class ProdJobCommandLineRunner 实现 CommandLineRunner {
@Value("${jobname}") private String jobName; @Value("${argument1}") private String argument1; @Value("${argument2}") private String argument2; @Autowired private ApplicationContext context; @Autowired private JobLauncher jobLauncher; @Override public void run(String... args) { log.info("Begin Launching Job with Args {}", Arrays.asList(args)); log.error("JOB NAME: " + jobName); if (!CollectionUtils.isEmpty(Arrays.asList(args))) { JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(); jobParametersBuilder.addString("argument1", argument1); jobParametersBuilder.addString("argument2", argument2); try { Job job = (Job) context.getBean(jobName); jobLauncher.run(job, jobParametersBuilder.toJobParameters()); } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) { log.error("Exception ", e); } } log.info("End Launching Job with Args {}", Arrays.asList(args)); }
}
第二个 CommandLineRunner:
@组件 @Slf4j @Profile("!prod") public class DefaultJobCommandLineRunner 实现 CommandLineRunner {
@Autowired private ApplicationContext context; @Autowired private JobLauncher jobLauncher; @Override public void run(String... args) { log.info("Begin Launching Job with Args {}", Arrays.asList(args)); if (!CollectionUtils.isEmpty(Arrays.asList(args))) { Map<String, String> params = parseJobArgs(args); JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(); if (Boolean.parseBoolean(params.getOrDefault("force_restart", "false"))) { jobParametersBuilder.addString("force_restart", LocalDateTime.now().toString()); } try { String jobName = params.get("job_name"); log.info("JOB NAME: " + jobName); Job job = (Job) context.getBean(jobName); jobLauncher.run(job, jobParametersBuilder.toJobParameters()); } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) { log.error("Exception ", e); } } log.info("End Launching Job with Args {}", Arrays.asList(args)); } private Map<String, String> parseJobArgs(String[] args) { Map<String, String> params = new HashMap<>(); Arrays.asList(args).forEach(arg -> { String key = StringUtils.trimAllWhitespace(arg.split("=")[0]); String value = StringUtils.trimAllWhitespace(arg.split("=")[1]); params.put(key, value); }); return params; }
}
在 SCDF 中导入应用程序,例如 TESTAPP
使用相同的导入应用创建多个任务,具体取决于您有多少用例
对于首次启动的每个任务,请按照命名约定设置您拥有的参数: 应用程序。 "APP_NAME"。 "property key"="property value"
在这种情况下,例如它将是:app.TESTAPP.jobname=JOB_NAME
希望对您有所帮助。