动态配置 Spring 批处理执行时间和参数通过数据库配置的参数
Dynamic config Spring batch execution time and parameters via database configured parameters
我是新手spring批处理用户,请帮帮我。这是我的要求:
- 我完成了多个 spring 不同名称的批处理作业。我想用不同的作业参数执行这些作业,我希望这些参数可以在数据库中动态配置,这样我就可以添加不同作业名称和不同参数的新作业执行。
- 另外,我想在不同的时间安排我的作业执行,也可以配置crontab表达式。
也许数据库结构是这样的:
- 编号
- task_name
- spring_batch_job_name
- cron_expression
不知道有没有大佬可以指导一下。非常感谢!
您可以使用创建一些 Util
class(在 @PostConstruct
上)从数据库加载您的作业配置。
例如:
@Entity
public class Configuration{
@Id
private long id;
private String field;
private String value;
// getter and setter
}
@Component
public interface ConfigurationRepo extends JpaRepository<Configuration, Long> {
}
public final class ConfigurationUtil {
private ConfigurationUtil() {
}
private static List<Configuration> defaultConfiguration;
/**
* @return the defaultConfiguration
*/
public static List<Configuration> getDefaultConfiguration() {
return defaultConfiguration;
}
/**
* @param defaultConfiguration the defaultConfiguration to set
*/
public static void setDefaultConfiguration(List<Configuration> defaultConfiguration) {
ConfigurationUtil.defaultConfiguration = defaultConfiguration;
}
public static String getValueByField(String field) {
return defaultConfiguration.stream()
.filter(s -> s.getField()
.equalsIgnoreCase(field))
.findFirst()
.get()
.getValue();
}
}
@Component
public class ConfigurationContextInitializer {
@Autowired
ConfigurationRepo configurationRepo;
@PostConstruct
public void init() {
ConfigurationUtil.setDefaultConfiguration(configurationRepo.findAll());
}
}
//To access DB value
ConfigurationUtil.getValueByField("JOB_NAME"); // depends on your DB key
这是我的工作设置实体:
@Entity
@Table(name = "report_tasks_manager", schema = "reconciliation", catalog = "")
public class ReportTasksManager {
private int id;
private String taskDesc;
private String taskName;
// crontab expression
private String cronExpression;
// class name to execute job logic
private String methodName;
private int state;
private Integer conCurrent;
private String reserved1;
private String reserved2;
private String reserved3;
private Timestamp startTime;
private Timestamp endTime;
private Timestamp createTime;
}
我定义了一个 class 实现了 Job 接口,这个 class 中的 execute() 方法执行业务逻辑,比如启动一个 spring 批处理作业:
public class QuartzJobFactory implements Job {
public QuartzJobFactory() {
}
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
System.out.println("time ={" + new Date() + "}");
System.out.println("starting job build factory");
ReportTasksManager reportTasksManager = (ReportTasksManager) jobExecutionContext.getMergedJobDataMap().get("scheduleJob");
System.out.println("job name = {" + reportTasksManager.getTaskName() + "}");
}
}
为了注册一个 cron 触发器,我定义了一个 rest 控制器来检查数据库中的作业参数并配置调度程序
@RestController
@RequestMapping(path = "test")
public class QuartzManager {
private SchedulerFactory schedulerFactory = new StdSchedulerFactory();
@Autowired
private ReportTaskManagerDAO reportTaskManagerDAO;
@GetMapping(value = "schedule")
public void scheduleJob() {
// Read settings from database
List<ReportTasksManager> quartzList = reportTaskManagerDAO.findAll();
if (quartzList.size() > 0) {
quartzList.forEach(reportTasksManager -> {
try {
configQuartz(reportTasksManager, schedulerFactory.getScheduler());
} catch (SchedulerException | ClassNotFoundException e) {
e.printStackTrace();
}
});
}
}
@SuppressWarnings("unchecked")
private void configQuartz(ReportTasksManager reportTasksManager, Scheduler scheduler) throws SchedulerException, ClassNotFoundException {
TriggerKey triggerKey = TriggerKey.triggerKey(reportTasksManager.getTaskName(), Scheduler.DEFAULT_GROUP);
// check if triggers already defined in scheduler
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
if (null == trigger) {
// not define——new trigger&jobDetail
JobDetail jobDetail =
JobBuilder.newJob((Class<? extends Job>) Class.forName(reportTasksManager.getMethodName()))
.withIdentity(reportTasksManager.getTaskName(), Scheduler.DEFAULT_GROUP)
.build();
jobDetail.getJobDataMap().put("scheduleJob", reportTasksManager);
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(reportTasksManager.getCronExpression());
trigger = TriggerBuilder.newTrigger()
.withIdentity(reportTasksManager.getTaskName(), Scheduler.DEFAULT_GROUP)
.withSchedule(scheduleBuilder)
.build();
scheduler.scheduleJob(jobDetail, trigger);
scheduler.start();
} else {
// already defined——update
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(reportTasksManager.getCronExpression());
trigger = trigger.getTriggerBuilder()
.withIdentity(triggerKey)
.withSchedule(scheduleBuilder)
.build();
scheduler.rescheduleJob(triggerKey, trigger);
}
}
}
我是新手spring批处理用户,请帮帮我。这是我的要求:
- 我完成了多个 spring 不同名称的批处理作业。我想用不同的作业参数执行这些作业,我希望这些参数可以在数据库中动态配置,这样我就可以添加不同作业名称和不同参数的新作业执行。
- 另外,我想在不同的时间安排我的作业执行,也可以配置crontab表达式。
也许数据库结构是这样的:
- 编号
- task_name
- spring_batch_job_name
- cron_expression
不知道有没有大佬可以指导一下。非常感谢!
您可以使用创建一些 Util
class(在 @PostConstruct
上)从数据库加载您的作业配置。
例如:
@Entity
public class Configuration{
@Id
private long id;
private String field;
private String value;
// getter and setter
}
@Component
public interface ConfigurationRepo extends JpaRepository<Configuration, Long> {
}
public final class ConfigurationUtil {
private ConfigurationUtil() {
}
private static List<Configuration> defaultConfiguration;
/**
* @return the defaultConfiguration
*/
public static List<Configuration> getDefaultConfiguration() {
return defaultConfiguration;
}
/**
* @param defaultConfiguration the defaultConfiguration to set
*/
public static void setDefaultConfiguration(List<Configuration> defaultConfiguration) {
ConfigurationUtil.defaultConfiguration = defaultConfiguration;
}
public static String getValueByField(String field) {
return defaultConfiguration.stream()
.filter(s -> s.getField()
.equalsIgnoreCase(field))
.findFirst()
.get()
.getValue();
}
}
@Component
public class ConfigurationContextInitializer {
@Autowired
ConfigurationRepo configurationRepo;
@PostConstruct
public void init() {
ConfigurationUtil.setDefaultConfiguration(configurationRepo.findAll());
}
}
//To access DB value
ConfigurationUtil.getValueByField("JOB_NAME"); // depends on your DB key
这是我的工作设置实体:
@Entity
@Table(name = "report_tasks_manager", schema = "reconciliation", catalog = "")
public class ReportTasksManager {
private int id;
private String taskDesc;
private String taskName;
// crontab expression
private String cronExpression;
// class name to execute job logic
private String methodName;
private int state;
private Integer conCurrent;
private String reserved1;
private String reserved2;
private String reserved3;
private Timestamp startTime;
private Timestamp endTime;
private Timestamp createTime;
}
我定义了一个 class 实现了 Job 接口,这个 class 中的 execute() 方法执行业务逻辑,比如启动一个 spring 批处理作业:
public class QuartzJobFactory implements Job {
public QuartzJobFactory() {
}
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
System.out.println("time ={" + new Date() + "}");
System.out.println("starting job build factory");
ReportTasksManager reportTasksManager = (ReportTasksManager) jobExecutionContext.getMergedJobDataMap().get("scheduleJob");
System.out.println("job name = {" + reportTasksManager.getTaskName() + "}");
}
}
为了注册一个 cron 触发器,我定义了一个 rest 控制器来检查数据库中的作业参数并配置调度程序
@RestController
@RequestMapping(path = "test")
public class QuartzManager {
private SchedulerFactory schedulerFactory = new StdSchedulerFactory();
@Autowired
private ReportTaskManagerDAO reportTaskManagerDAO;
@GetMapping(value = "schedule")
public void scheduleJob() {
// Read settings from database
List<ReportTasksManager> quartzList = reportTaskManagerDAO.findAll();
if (quartzList.size() > 0) {
quartzList.forEach(reportTasksManager -> {
try {
configQuartz(reportTasksManager, schedulerFactory.getScheduler());
} catch (SchedulerException | ClassNotFoundException e) {
e.printStackTrace();
}
});
}
}
@SuppressWarnings("unchecked")
private void configQuartz(ReportTasksManager reportTasksManager, Scheduler scheduler) throws SchedulerException, ClassNotFoundException {
TriggerKey triggerKey = TriggerKey.triggerKey(reportTasksManager.getTaskName(), Scheduler.DEFAULT_GROUP);
// check if triggers already defined in scheduler
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
if (null == trigger) {
// not define——new trigger&jobDetail
JobDetail jobDetail =
JobBuilder.newJob((Class<? extends Job>) Class.forName(reportTasksManager.getMethodName()))
.withIdentity(reportTasksManager.getTaskName(), Scheduler.DEFAULT_GROUP)
.build();
jobDetail.getJobDataMap().put("scheduleJob", reportTasksManager);
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(reportTasksManager.getCronExpression());
trigger = TriggerBuilder.newTrigger()
.withIdentity(reportTasksManager.getTaskName(), Scheduler.DEFAULT_GROUP)
.withSchedule(scheduleBuilder)
.build();
scheduler.scheduleJob(jobDetail, trigger);
scheduler.start();
} else {
// already defined——update
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(reportTasksManager.getCronExpression());
trigger = trigger.getTriggerBuilder()
.withIdentity(triggerKey)
.withSchedule(scheduleBuilder)
.build();
scheduler.rescheduleJob(triggerKey, trigger);
}
}
}