强制 Spring Boot @Scheduled 方法在失败时被调用 "out of schedule"
Force Spring Boot @Scheduled method to be called "out of schedule" in case of failure
我有一个 Spring Boot (2.6.6) @Scheduled
方法,每 10 分钟从外部服务获取数据。
在正常情况下,我对间隔很满意。但是,如果数据获取失败(服务暂时不可用),我想一次性缩短它,并强制将方法安排得更早。
像这样:
@Scheduled(fixedDelay = 10, timeUnit = TimeUnit.MINUTES)
public void fetchData() {
try {
this.data = myServiceConnector.fetchData();
}
catch (MyServiceNotAvailableException ex) {
// temporarily set the scheduling delay so it will happen again in 5 seconds - HOWTO?
}
}
N.B。如果它与问题相关,数据获取的行为是一个反应代码,所以实际上它看起来像
@Scheduled(fixedDelay = 10, timeUnit = TimeUnit.MINUTES)
public void fetchData() {
myServiceConnector.fetchData()
.doOnSuccess(fetchedData -> this.data = fetchedData)
.doOnError(throwable ->
// temporarily set the scheduling delay so it will happen again in 5 seconds - HOWTO?
)
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
}
编辑:已更新以显示始终使用 AlwaysRetryPolicy(重试直至成功)。
看来您真正需要的只是集成到 @Scheduled
方法中的重试逻辑。 Spring Retry 是一个 Spring AOP 项目,可以在这方面为您提供帮助。您可以配置重试次数,以及重试前等待的延迟。大体上看起来像这样(启用它需要额外的配置):
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
AlwaysRetryPolicy policy = new AlwaysRetryPolicy();
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
...
@Scheduled(fixedDelay = 10, timeUnit = TimeUnit.MINUTES)
public void fetchData() {
retryTemplate.execute(context -> {
this.data = myServiceConnector.fetchData();
});
}
我提出了以下解决方案。
- 一开始,我手动将
fetchData()
任务安排到运行马上。
- 在
fetchData()
Mono
的订阅时间里,根据fetch操作的结果,我手动安排了下一个运行。由于这发生在操作结束时,它是一个 end-to-start 间隔,我不必解决重叠方法 运行s. 的潜在问题
@Service
@Slf4j
@RequiredArgsConstructor
public class DataUpdaterService {
@Autowired
private MyServiceConnector myServiceConnector;
private TaskScheduler scheduler;
private final Clock clock = Clock.systemUTC();
@Getter
private volatile Data data = null;
@PostConstruct
void init() {
final ScheduledExecutorService localExecutor = Executors.newSingleThreadScheduledExecutor();
scheduler = new ConcurrentTaskScheduler(localExecutor);
// Schedule the task to run for the first time right now:
scheduleFetchTask(Duration.ZERO);
}
private void scheduleFetchTask(Duration interval) {
scheduler.schedule(this::fetchData, clock.instant().plus(interval));
}
private void fetchData() {
myServiceConnector.fetchData()
.doOnSuccess(fetchedData -> {
this.data = fetchedData;
// Schedule the task in the normal mode:
scheduleFetchTask(Duration.ofMinutes(10));
})
.doOnError(throwable -> {
log.error("Error fetching data from myService", throwable);
// Schedule the task in the retry mode:
scheduleFetchTask(Duration.ofSeconds(5));
})
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
}
}
myServiceConnector.fetchData()
的签名是
Mono<Data> fetchData()
为了完整起见,我将在此处保留另一个选项,但 looks the cleanest for his reactive case. Spring's Trigger 用于计算下一次执行时间。在这种情况下,它必须知道任务是否成功。我没有找到任何方法通过 API 通知触发器,我最好的想法是使用实例变量。
public class DynamicTrigger implements Trigger {
private boolean isTaskSuccessFul = true;
@Override
public Date nextExecutionTime(TriggerContext triggerContext) {
Date last = triggerContext.lastCompletionTime();
Instant lastCompleted = last != null ? last.toInstant() : Instant.now();
int seconds = this.isTaskSuccessFul ? 10 : 3;
return Date.from(lastCompleted.plusSeconds(seconds));
}
public void setTaskSuccessFul(boolean taskSuccessFul) {
this.isTaskSuccessFul = taskSuccessFul;
}
}
任务,非常简单地模拟每两个任务失败:
public class ThrowingRunnable implements Runnable {
private int count = 1;
@Override
public void run() {
int current = this.count;
if (this.count++ % 2 == 0) {
System.out.println("Task failed - " + LocalDateTime.now() + " - " + current);
throw new RuntimeException();
}
System.out.println("Task success - " + LocalDateTime.now() + " - " + current);
}
}
另一个可运行的,包装任务本身,处理异常并通知触发器成功或失败:
public class WrappingRunnable implements Runnable {
private final Runnable wrappedRunnable;
private final DynamicTrigger trigger;
public WrappingRunnable(Runnable wrappedRunnable, DynamicTrigger trigger) {
this.wrappedRunnable = wrappedRunnable;
this.trigger = trigger;
}
@Override
public void run() {
try {
this.wrappedRunnable.run();
this.trigger.setTaskSuccessFul(true);
} catch (RuntimeException exc) {
this.trigger.setTaskSuccessFul(false);
}
}
}
另一种可能性是通过错误处理程序通知触发器失败,但这需要使用自定义异常和转换来传输它,我认为这不值得。
并手动安排触发器:
@Configuration
@EnableScheduling
public class SchedulerConfig implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
ConcurrentTaskScheduler taskScheduler = new ConcurrentTaskScheduler(Executors.newSingleThreadScheduledExecutor());
taskScheduler.setErrorHandler(Throwable::printStackTrace);
taskRegistrar.setTaskScheduler(taskScheduler);
DynamicTrigger trigger = new DynamicTrigger();
WrappingRunnable runnable = new WrappingRunnable(new ThrowingRunnable(), trigger);
taskRegistrar.addTriggerTask(runnable, trigger);
}
}
编辑: 将响应式代码包装到 Runnable
中可以像这样:
public class MonoRunnable implements Runnable {
private final MyService myService;
private final DynamicTrigger trigger;
public MonoRunnable(MyService myService, DynamicTrigger trigger) {
this.myService = myService;
this.trigger = trigger;
}
@Override
public void run() {
this.myService.fetchData().doOnSuccess(data -> {
//do stuff with data
this.trigger.setTaskSuccessFul(true);
}).doOnError(throwable -> {
//do stuff with error
this.trigger.setTaskSuccessFul(false);
}).subscribe();
}
}
我跳过了 subscribeOn()
,因为我不确定 Worker
会产生什么影响,考虑到任务将使用 spring 调度程序进行调度。 On success 和 on error 函数将负责通知 Trigger
,并且不需要 WrappingRunnable
,我们可以直接注册单声道任务:
MonoRunnable runnable = new MonoRunnable(this.myService, trigger);
taskRegistrar.addTriggerTask(runnable, trigger);
我有一个 Spring Boot (2.6.6) @Scheduled
方法,每 10 分钟从外部服务获取数据。
在正常情况下,我对间隔很满意。但是,如果数据获取失败(服务暂时不可用),我想一次性缩短它,并强制将方法安排得更早。
像这样:
@Scheduled(fixedDelay = 10, timeUnit = TimeUnit.MINUTES)
public void fetchData() {
try {
this.data = myServiceConnector.fetchData();
}
catch (MyServiceNotAvailableException ex) {
// temporarily set the scheduling delay so it will happen again in 5 seconds - HOWTO?
}
}
N.B。如果它与问题相关,数据获取的行为是一个反应代码,所以实际上它看起来像
@Scheduled(fixedDelay = 10, timeUnit = TimeUnit.MINUTES)
public void fetchData() {
myServiceConnector.fetchData()
.doOnSuccess(fetchedData -> this.data = fetchedData)
.doOnError(throwable ->
// temporarily set the scheduling delay so it will happen again in 5 seconds - HOWTO?
)
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
}
编辑:已更新以显示始终使用 AlwaysRetryPolicy(重试直至成功)。
看来您真正需要的只是集成到 @Scheduled
方法中的重试逻辑。 Spring Retry 是一个 Spring AOP 项目,可以在这方面为您提供帮助。您可以配置重试次数,以及重试前等待的延迟。大体上看起来像这样(启用它需要额外的配置):
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
AlwaysRetryPolicy policy = new AlwaysRetryPolicy();
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
...
@Scheduled(fixedDelay = 10, timeUnit = TimeUnit.MINUTES)
public void fetchData() {
retryTemplate.execute(context -> {
this.data = myServiceConnector.fetchData();
});
}
我提出了以下解决方案。
- 一开始,我手动将
fetchData()
任务安排到运行马上。 - 在
fetchData()
Mono
的订阅时间里,根据fetch操作的结果,我手动安排了下一个运行。由于这发生在操作结束时,它是一个 end-to-start 间隔,我不必解决重叠方法 运行s. 的潜在问题
@Service
@Slf4j
@RequiredArgsConstructor
public class DataUpdaterService {
@Autowired
private MyServiceConnector myServiceConnector;
private TaskScheduler scheduler;
private final Clock clock = Clock.systemUTC();
@Getter
private volatile Data data = null;
@PostConstruct
void init() {
final ScheduledExecutorService localExecutor = Executors.newSingleThreadScheduledExecutor();
scheduler = new ConcurrentTaskScheduler(localExecutor);
// Schedule the task to run for the first time right now:
scheduleFetchTask(Duration.ZERO);
}
private void scheduleFetchTask(Duration interval) {
scheduler.schedule(this::fetchData, clock.instant().plus(interval));
}
private void fetchData() {
myServiceConnector.fetchData()
.doOnSuccess(fetchedData -> {
this.data = fetchedData;
// Schedule the task in the normal mode:
scheduleFetchTask(Duration.ofMinutes(10));
})
.doOnError(throwable -> {
log.error("Error fetching data from myService", throwable);
// Schedule the task in the retry mode:
scheduleFetchTask(Duration.ofSeconds(5));
})
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
}
}
myServiceConnector.fetchData()
的签名是
Mono<Data> fetchData()
为了完整起见,我将在此处保留另一个选项,但
public class DynamicTrigger implements Trigger {
private boolean isTaskSuccessFul = true;
@Override
public Date nextExecutionTime(TriggerContext triggerContext) {
Date last = triggerContext.lastCompletionTime();
Instant lastCompleted = last != null ? last.toInstant() : Instant.now();
int seconds = this.isTaskSuccessFul ? 10 : 3;
return Date.from(lastCompleted.plusSeconds(seconds));
}
public void setTaskSuccessFul(boolean taskSuccessFul) {
this.isTaskSuccessFul = taskSuccessFul;
}
}
任务,非常简单地模拟每两个任务失败:
public class ThrowingRunnable implements Runnable {
private int count = 1;
@Override
public void run() {
int current = this.count;
if (this.count++ % 2 == 0) {
System.out.println("Task failed - " + LocalDateTime.now() + " - " + current);
throw new RuntimeException();
}
System.out.println("Task success - " + LocalDateTime.now() + " - " + current);
}
}
另一个可运行的,包装任务本身,处理异常并通知触发器成功或失败:
public class WrappingRunnable implements Runnable {
private final Runnable wrappedRunnable;
private final DynamicTrigger trigger;
public WrappingRunnable(Runnable wrappedRunnable, DynamicTrigger trigger) {
this.wrappedRunnable = wrappedRunnable;
this.trigger = trigger;
}
@Override
public void run() {
try {
this.wrappedRunnable.run();
this.trigger.setTaskSuccessFul(true);
} catch (RuntimeException exc) {
this.trigger.setTaskSuccessFul(false);
}
}
}
另一种可能性是通过错误处理程序通知触发器失败,但这需要使用自定义异常和转换来传输它,我认为这不值得。
并手动安排触发器:
@Configuration
@EnableScheduling
public class SchedulerConfig implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
ConcurrentTaskScheduler taskScheduler = new ConcurrentTaskScheduler(Executors.newSingleThreadScheduledExecutor());
taskScheduler.setErrorHandler(Throwable::printStackTrace);
taskRegistrar.setTaskScheduler(taskScheduler);
DynamicTrigger trigger = new DynamicTrigger();
WrappingRunnable runnable = new WrappingRunnable(new ThrowingRunnable(), trigger);
taskRegistrar.addTriggerTask(runnable, trigger);
}
}
编辑: 将响应式代码包装到 Runnable
中可以像这样:
public class MonoRunnable implements Runnable {
private final MyService myService;
private final DynamicTrigger trigger;
public MonoRunnable(MyService myService, DynamicTrigger trigger) {
this.myService = myService;
this.trigger = trigger;
}
@Override
public void run() {
this.myService.fetchData().doOnSuccess(data -> {
//do stuff with data
this.trigger.setTaskSuccessFul(true);
}).doOnError(throwable -> {
//do stuff with error
this.trigger.setTaskSuccessFul(false);
}).subscribe();
}
}
我跳过了 subscribeOn()
,因为我不确定 Worker
会产生什么影响,考虑到任务将使用 spring 调度程序进行调度。 On success 和 on error 函数将负责通知 Trigger
,并且不需要 WrappingRunnable
,我们可以直接注册单声道任务:
MonoRunnable runnable = new MonoRunnable(this.myService, trigger);
taskRegistrar.addTriggerTask(runnable, trigger);