如何在 ScheduledFutures 可运行任务中自动装配应用程序上下文

How to autowire Application context in a ScheduledFutures runnable task

我正在尝试根据配置为 运行 报告任务创建一个模块。想法是将报告配置传递给单个任务以定义每个报告的细节。报告配置包含数据信息和 class 名称,将调用它们来执行单独的任务。为此,我有一个调度服务,用于在启动时动态配置我的计划任务。任务(可运行实现)是 ReportExecutor。

@Service
public class ReportSchedulingService {
    private static final Logger logger = LoggerFactory.getLogger(ReportSchedulingService.class);

    @Autowired
    private ThreadPoolTaskScheduler taskScheduler;

    @Autowired
    private ReportList reportList;

    private static Map<String, ScheduledFuture<?>> jobsMap = new HashMap<String, ScheduledFuture<?>>();

    public void scheduleAllReports() {
        for (Map.Entry<String, Report> reportEntry : reportList.getReports().entrySet()) {
            String reportName = reportEntry.getKey();
            Report report = reportEntry.getValue();
            String jobId = UUID.randomUUID().toString();
            logger.info(String.format("Scheduling report [%s] with job id: [%s] and cron expression: [%s]",
                reportName, jobId, report.getCron()));
            ReportExecutor execution = new ReportExecutor();
            report.setId(jobId);
            report.setName(reportName);
            execution.setTaskDefinition(report);
            ScheduledFuture<?> scheduledTask = taskScheduler.schedule(execution,
                new CronTrigger(report.getCron(), TimeZone.getTimeZone(TimeZone.getDefault().getID())));
            getJobMap().put(reportName + "_" + jobId, scheduledTask);
        }
    }

    private synchronized Map<String, ScheduledFuture<?>> getJobMap() {
        if (jobsMap == null) {
            jobsMap = new HashMap<String, ScheduledFuture<?>>();
        }
        return jobsMap;
    }

    public void removeScheduledTask(String jobId) {
        ScheduledFuture<?> scheduledTask = getJobMap().get(jobId);
        if (scheduledTask != null) {
            scheduledTask.cancel(true);
            getJobMap().put(jobId, null);
        }
    }
}

报告执行者是:

@Component
public class ReportExecutor implements Runnable {

    @Autowired
    private ApplicationContext appContext;

    private Report reportDefinition;

    @Override
    public void run() {
        SpringBeanAutowiringSupport.processInjectionBasedOnCurrentContext(this);
        System.out.println(reportDefinition);
        IReportGenerator generator = appContext.getBean(
            reportDefinition.getExecute(),
            IReportGenerator.class);
        JSONArray reportData = generator.generate(reportDefinition);

        IReportNotifier notifier =
            appContext.getBean(reportDefinition.getNotification().getExecute(), IReportNotifier.class);
        notifier.send(reportDefinition, reportData);

    }
    public Report getTaskDefinition() {
        return reportDefinition;
    }
    public void setTaskDefinition(Report reportDefinition) {
        this.reportDefinition = reportDefinition;
    }
}

但我也尝试过:

@Component
public class ReportExecutor implements Runnable, ApplicationContextAware {

    private ApplicationContext appContext;

    private Report reportDefinition;

    @Override
    public void run() {
        SpringBeanAutowiringSupport.processInjectionBasedOnCurrentContext(this);
        System.out.println(reportDefinition);
        IReportGenerator generator = appContext.getBean(
            reportDefinition.getExecute(),
            IReportGenerator.class);
        JSONArray reportData = generator.generate(reportDefinition);

        IReportNotifier notifier =
            appContext.getBean(reportDefinition.getNotification().getExecute(), IReportNotifier.class);
        notifier.send(reportDefinition, reportData);

    }

    public Report getTaskDefinition() {
        return reportDefinition;
    }

    public void setTaskDefinition(Report reportDefinition) {
        this.reportDefinition = reportDefinition;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.appContext = applicationContext;
    }

}

似乎我无法在此处获取应用程序上下文并最终在 NPE 处结束:

        IReportGenerator generator = appContext.getBean(

stackTrace 是:

java.lang.NullPointerException: null
    at ote.itarc.report.ReportExecutor.run(ReportExecutor.java:27) ~[classes/:?]
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) [spring-context-5.2.10.RELEASE.jar:5.2.10.RELEASE]
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93) [spring-context-5.2.10.RELEASE.jar:5.2.10.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
    at java.lang.Thread.run(Thread.java:836) [?:?]

ReportList 对象来自 yaml 属性:

reportlist:
  reports:
    missinginfra:
      desc: "This report provides ..."
      execute: QueryReportGenerator
      query: SELECT * FROM DUAL
      cron: "0 0/2 * 1/1 * ?"
      notification:
        execute: EmailCsvReportNotifier
        templateid: 0

具有以下定义

@ConfigurationProperties(prefix = "reportlist")
@Configuration
@EnableConfigurationProperties
public class ReportList {

    private Map<String, Report> reports;

    public Map<String, Report> getReports() {
        return reports;
    }

    public void setReports(Map<String, Report> reports) {
        this.reports = reports;
    }

    @Override
    public String toString() {
        return "ReportList [reports=" + reports + "]";
    }
}

使 ReportExecutor 成为 Spring 托管 bean 并使其具有原型作用域。接下来在您的 ReportSchedulingService 中使用 ApplicationContext 获取一个实例(并让 Spring 进行连接)并安排它。

@Service
public class ReportSchedulingService {
    private static final Logger logger = LoggerFactory.getLogger(ReportSchedulingService.class);

    private final Map<String, ScheduledFuture<?>> jobsMap = new ConcurrentHashMap<String, ScheduledFuture<?>>();

    private final TaskScheduler taskScheduler;
    private final ReportList reportList;
    private final ApplicationContext ctx;

    public ReportSchedulingService(ReportList reportList, TaskScheduler taskScheduler, ApplicationContext ctx) {
        this.reportList=reportList;
        this.taskScheduler=taskScheduler;
        this.ctx=ctx;
    }

    public void scheduleAllReports() {
        for (Map.Entry<String, Report> reportEntry : reportList.getReports().entrySet()) {
            String reportName = reportEntry.getKey();
            Report report = reportEntry.getValue();
            String jobId = UUID.randomUUID().toString();
            logger.info(String.format("Scheduling report [%s] with job id: [%s] and cron expression: [%s]",
                reportName, jobId, report.getCron()));
            ReportExecutor execution = ctx.getBean(ReportExecutor.class);
            report.setId(jobId);
            report.setName(reportName);
            execution.setTaskDefinition(report);
            ScheduledFuture<?> scheduledTask = taskScheduler.schedule(execution,
                new CronTrigger(report.getCron(), TimeZone.getTimeZone(TimeZone.getDefault().getID())));
            this.jobsMap.put(reportName + "_" + jobId, scheduledTask);
        }
    }

    public void removeScheduledTask(String jobId) {
        ScheduledFuture<?> scheduledTask = this.jobsMap.get(jobId);
        if (scheduledTask != null) {
            scheduledTask.cancel(true);
            getJobMap().put(jobId, null);
        }
    }
}

注意: 我冒昧地也稍微改进了您的代码,使用构造函数注入并使用 ConcurrentHashMap 而不是 syncronized

@Component
@Scope(scopeName=SCOPE_PROTOTYPE)
public class ReportExecutor implements Runnable {

    private final ApplicationContext appContext;
    private Report reportDefinition;

    public ReportExecutor(ApplicationContext ctx) {
      this.appContext=ctx;
    }

    @Override
    public void run() {
        IReportGenerator generator = appContext.getBean(
            reportDefinition.getExecute(),
            IReportGenerator.class);
        JSONArray reportData = generator.generate(reportDefinition);

        IReportNotifier notifier =
            appContext.getBean(reportDefinition.getNotification().getExecute(), IReportNotifier.class);
        notifier.send(reportDefinition, reportData);

    }

    public void setTaskDefinition(Report reportDefinition) {
        this.reportDefinition = reportDefinition;
    }
}

作用域代理将为每次调用 getBean 创建一个新实例。因此,对于要安排的每次执行,您都会获得一个全新、干净的实例。