使用 cron 在 Spring 中安排周期性反应任务?
Scheduling periodic reactive tasks in Spring using cron?
通常我会做这样的事情来安排在 Spring 中使用 cron 在给定时区定期执行的作业:
@Scheduled(cron = "0 0 10 * * *", zone = "Europe/Stockholm")
public void scheduleStuff() {
// Do stuff
}
这将阻止调用 scheduleStuff
的线程,直到作业完成。但是在这种情况下,我想做的 "stuff" 都是使用 Spring 的 project reactor 的非阻塞构建块实现的(即 Mono
、Flux
等).
例如假设我想定期触发此功能:
Flux<Void> stuff() {
return ..
}
我当然可以简单地调用stuff().subscribe()
(甚至stuff().block()
),但这会阻塞线程。对于非阻塞代码,是否有更好的方法来实现与 @Scheduled(cron = "0 0 10 * * *", zone = "Europe/Stockholm")
相同的功能?
我正在使用 Spring Boot 2.1。
您可以将 stuff
方法包装在异步方法中
例如:
@Scheduled(cron = "0 0 10 * * *", zone = "Europe/Stockholm")
public void scheduleStuff() {
stuffService.doStuffAsync();
}
使用异步方法的服务
public class StuffService() implements IStuffService {
@Async
public void doStuffAsync() {
// Call and subscribe to your flux method here
}
}
对 doStuffAsync()
的调用将 return 立即 scheduleStuff
因此不会阻塞线程。
实际上,subscribe()
不会阻塞您的线程。如果确实需要,您可以调用 stuff().subscribeOn(Schedulers.parallel()).subscribe()
或其他调度程序以确保执行将在单独的线程中完成。
这里还有一个选项:
public class PeriodicReactiveTasksInSpring implements SmartLifecycle {
private final AtomicReference<Subscription> subscription;
private final Long executionPeriod;
public PeriodicReactiveTasksInSpring(Long executionPeriod) {
this.subscription = new AtomicReference<>();
this.executionPeriod = executionPeriod;
}
@Override
public void start() {
if (Objects.isNull(subscription.get())) {
updateConfig()
.doOnSubscribe(sub -> {
subscription.set(sub);
}).subscribe();
}
}
@Override
public void stop() {
Optional.ofNullable(subscription.get())
.ifPresent(sub -> {
sub.cancel();
subscription.set(null);
});
}
@Override
public boolean isRunning() {
return Objects.nonNull(subscription.get());
}
private Flux<Item> updateConfig() {
return Flux.interval(Duration.ofMillis(executionPeriod))
.subscribeOn(Schedulers.boundedElastic())
.flatMap(cfg -> {
// Do your job here
})
.onErrorContinue((err, msg) -> LOGGER.error("Error: {} message: {}", err, msg));
}
}
通常我会做这样的事情来安排在 Spring 中使用 cron 在给定时区定期执行的作业:
@Scheduled(cron = "0 0 10 * * *", zone = "Europe/Stockholm")
public void scheduleStuff() {
// Do stuff
}
这将阻止调用 scheduleStuff
的线程,直到作业完成。但是在这种情况下,我想做的 "stuff" 都是使用 Spring 的 project reactor 的非阻塞构建块实现的(即 Mono
、Flux
等).
例如假设我想定期触发此功能:
Flux<Void> stuff() {
return ..
}
我当然可以简单地调用stuff().subscribe()
(甚至stuff().block()
),但这会阻塞线程。对于非阻塞代码,是否有更好的方法来实现与 @Scheduled(cron = "0 0 10 * * *", zone = "Europe/Stockholm")
相同的功能?
我正在使用 Spring Boot 2.1。
您可以将 stuff
方法包装在异步方法中
例如:
@Scheduled(cron = "0 0 10 * * *", zone = "Europe/Stockholm")
public void scheduleStuff() {
stuffService.doStuffAsync();
}
使用异步方法的服务
public class StuffService() implements IStuffService {
@Async
public void doStuffAsync() {
// Call and subscribe to your flux method here
}
}
对 doStuffAsync()
的调用将 return 立即 scheduleStuff
因此不会阻塞线程。
实际上,subscribe()
不会阻塞您的线程。如果确实需要,您可以调用 stuff().subscribeOn(Schedulers.parallel()).subscribe()
或其他调度程序以确保执行将在单独的线程中完成。
这里还有一个选项:
public class PeriodicReactiveTasksInSpring implements SmartLifecycle {
private final AtomicReference<Subscription> subscription;
private final Long executionPeriod;
public PeriodicReactiveTasksInSpring(Long executionPeriod) {
this.subscription = new AtomicReference<>();
this.executionPeriod = executionPeriod;
}
@Override
public void start() {
if (Objects.isNull(subscription.get())) {
updateConfig()
.doOnSubscribe(sub -> {
subscription.set(sub);
}).subscribe();
}
}
@Override
public void stop() {
Optional.ofNullable(subscription.get())
.ifPresent(sub -> {
sub.cancel();
subscription.set(null);
});
}
@Override
public boolean isRunning() {
return Objects.nonNull(subscription.get());
}
private Flux<Item> updateConfig() {
return Flux.interval(Duration.ofMillis(executionPeriod))
.subscribeOn(Schedulers.boundedElastic())
.flatMap(cfg -> {
// Do your job here
})
.onErrorContinue((err, msg) -> LOGGER.error("Error: {} message: {}", err, msg));
}
}