强制 Spring Boot @Scheduled 方法在失败时被调用 "out of schedule"

Force Spring Boot @Scheduled method to be called "out of schedule" in case of failure

我有一个 Spring Boot (2.6.6) @Scheduled 方法,每 10 分钟从外部服务获取数据。

在正常情况下,我对间隔很满意。但是,如果数据获取失败(服务暂时不可用),我想一次性缩短它,并强制将方法安排得更早。

像这样:

@Scheduled(fixedDelay = 10, timeUnit = TimeUnit.MINUTES)
public void fetchData() {
    try {
        this.data = myServiceConnector.fetchData();
    }
    catch (MyServiceNotAvailableException ex) {
        // temporarily set the scheduling delay so it will happen again in 5 seconds - HOWTO?
    }
}

N.B。如果它与问题相关,数据获取的行为是一个反应代码,所以实际上它看起来像

@Scheduled(fixedDelay = 10, timeUnit = TimeUnit.MINUTES)
public void fetchData() {
    myServiceConnector.fetchData()
        .doOnSuccess(fetchedData -> this.data = fetchedData)
        .doOnError(throwable -> 
            // temporarily set the scheduling delay so it will happen again in 5 seconds - HOWTO?
        )
        .subscribeOn(Schedulers.boundedElastic())
        .subscribe();
}

编辑:已更新以显示始终使用 AlwaysRetryPolicy(重试直至成功)。

看来您真正需要的只是集成到 @Scheduled 方法中的重试逻辑。 Spring Retry 是一个 Spring AOP 项目,可以在这方面为您提供帮助。您可以配置重试次数,以及重试前等待的延迟。大体上看起来像这样(启用它需要额外的配置):

@Bean
public RetryTemplate retryTemplate() {
   RetryTemplate retryTemplate = new RetryTemplate();
   AlwaysRetryPolicy policy = new AlwaysRetryPolicy();

   retryTemplate.setRetryPolicy(retryPolicy);


   return retryTemplate;
}
...

@Scheduled(fixedDelay = 10, timeUnit = TimeUnit.MINUTES)
public void fetchData() {
    retryTemplate.execute(context -> {
        this.data = myServiceConnector.fetchData();
    });
}

该项目有更多关于 Github and here 的信息是来自 Baeldung 的指南。

我提出了以下解决方案。

  1. 一开始,我手动将fetchData()任务安排到运行马上。
  2. fetchData()Mono的订阅时间里,根据fetch操作的结果,我手动安排了下一个运行。由于这发生在操作结束时,它是一个 end-to-start 间隔,我不必解决重叠方法 运行s.
  3. 的潜在问题
@Service
@Slf4j
@RequiredArgsConstructor
public class DataUpdaterService {

    @Autowired
    private MyServiceConnector myServiceConnector;

    private TaskScheduler scheduler;
    private final Clock clock = Clock.systemUTC();

    @Getter
    private volatile Data data = null;

    @PostConstruct
    void init() {
        final ScheduledExecutorService localExecutor = Executors.newSingleThreadScheduledExecutor();
        scheduler = new ConcurrentTaskScheduler(localExecutor);
        // Schedule the task to run for the first time right now:
        scheduleFetchTask(Duration.ZERO);
    }

    private void scheduleFetchTask(Duration interval) {
        scheduler.schedule(this::fetchData, clock.instant().plus(interval));
    }

    private void fetchData() {
        myServiceConnector.fetchData()
            .doOnSuccess(fetchedData -> {
                this.data = fetchedData;
                // Schedule the task in the normal mode:
                scheduleFetchTask(Duration.ofMinutes(10));
            })
            .doOnError(throwable -> {
                log.error("Error fetching data from myService", throwable);
                // Schedule the task in the retry mode:
                scheduleFetchTask(Duration.ofSeconds(5));
            })
            .subscribeOn(Schedulers.boundedElastic())
            .subscribe();
    }
}

myServiceConnector.fetchData()的签名是

Mono<Data> fetchData()

为了完整起见,我将在此处保留另一个选项,但 looks the cleanest for his reactive case. Spring's Trigger 用于计算下一次执行时间。在这种情况下,它必须知道任务是否成功。我没有找到任何方法通过 API 通知触发器,我最好的想法是使用实​​例变量。

public class DynamicTrigger implements Trigger {

  private boolean isTaskSuccessFul = true;

  @Override
  public Date nextExecutionTime(TriggerContext triggerContext) {
    Date last = triggerContext.lastCompletionTime();
    Instant lastCompleted = last != null ? last.toInstant() : Instant.now();
    int seconds = this.isTaskSuccessFul ? 10 : 3;
    return Date.from(lastCompleted.plusSeconds(seconds));
  }

  public void setTaskSuccessFul(boolean taskSuccessFul) {
    this.isTaskSuccessFul = taskSuccessFul;
  }
}

任务,非常简单地模拟每两个任务失败:

public class ThrowingRunnable implements Runnable {

  private int count = 1;

  @Override
  public void run() {
    int current = this.count;
    if (this.count++ % 2 == 0) {
      System.out.println("Task failed - " + LocalDateTime.now() + " - " + current);
      throw new RuntimeException();
    }
    System.out.println("Task success - " + LocalDateTime.now() + " - " + current);
  }
}

另一个可运行的,包装任务本身,处理异常并通知触发器成功或失败:

public class WrappingRunnable implements Runnable {

  private final Runnable wrappedRunnable;
  private final DynamicTrigger trigger;

  public WrappingRunnable(Runnable wrappedRunnable, DynamicTrigger trigger) {
    this.wrappedRunnable = wrappedRunnable;
    this.trigger = trigger;
  }

  @Override
  public void run() {
    try {
      this.wrappedRunnable.run();
      this.trigger.setTaskSuccessFul(true);
    } catch (RuntimeException exc) {
      this.trigger.setTaskSuccessFul(false);
    }
  }
}

另一种可能性是通过错误处理程序通知触发器失败,但这需要使用自定义异常和转换来传输它,我认为这不值得。

并手动安排触发器:

@Configuration
@EnableScheduling
public class SchedulerConfig implements SchedulingConfigurer {

  @Override
  public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
    ConcurrentTaskScheduler taskScheduler = new ConcurrentTaskScheduler(Executors.newSingleThreadScheduledExecutor());
    taskScheduler.setErrorHandler(Throwable::printStackTrace);
    taskRegistrar.setTaskScheduler(taskScheduler);

    DynamicTrigger trigger = new DynamicTrigger();
    WrappingRunnable runnable = new WrappingRunnable(new ThrowingRunnable(), trigger);
    taskRegistrar.addTriggerTask(runnable, trigger);
  }
}

编辑: 将响应式代码包装到 Runnable 中可以像这样:

public class MonoRunnable implements Runnable {

  private final MyService myService;
  private final DynamicTrigger trigger;

  public MonoRunnable(MyService myService, DynamicTrigger trigger) {
    this.myService = myService;
    this.trigger = trigger;
  }

  @Override
  public void run() {
    this.myService.fetchData().doOnSuccess(data -> {
      //do stuff with data
      this.trigger.setTaskSuccessFul(true);
    }).doOnError(throwable -> {
      //do stuff with error
      this.trigger.setTaskSuccessFul(false);
    }).subscribe();
  }
}

我跳过了 subscribeOn(),因为我不确定 Worker 会产生什么影响,考虑到任务将使用 spring 调度程序进行调度。 On success 和 on error 函数将负责通知 Trigger,并且不需要 WrappingRunnable,我们可以直接注册单声道任务:

MonoRunnable runnable = new MonoRunnable(this.myService, trigger);
taskRegistrar.addTriggerTask(runnable, trigger);