使用 cron 在 Spring 中安排周期性反应任务?

Scheduling periodic reactive tasks in Spring using cron?

通常我会做这样的事情来安排在 Spring 中使用 cron 在给定时区定期执行的作业:

@Scheduled(cron = "0 0 10 * * *", zone = "Europe/Stockholm")
public void scheduleStuff() {
    // Do stuff
}

这将阻止调用 scheduleStuff 的线程,直到作业完成。但是在这种情况下,我想做的 "stuff" 都是使用 Spring 的 project reactor 的非阻塞构建块实现的(即 MonoFlux 等).

例如假设我想定期触发此功能:

Flux<Void> stuff() {
    return ..
}

我当然可以简单地调用stuff().subscribe()(甚至stuff().block()),但这会阻塞线程。对于非阻塞代码,是否有更好的方法来实现与 @Scheduled(cron = "0 0 10 * * *", zone = "Europe/Stockholm") 相同的功能?

我正在使用 Spring Boot 2.1。

您可以将 stuff 方法包装在异步方法中

例如:

@Scheduled(cron = "0 0 10 * * *", zone = "Europe/Stockholm")
public void scheduleStuff() {
    stuffService.doStuffAsync();
}

使用异步方法的服务

public class StuffService() implements IStuffService {

    @Async
    public void doStuffAsync() {
       // Call and subscribe to your flux method here
    }

}

doStuffAsync() 的调用将 return 立即 scheduleStuff 因此不会阻塞线程。

实际上,subscribe() 不会阻塞您的线程。如果确实需要,您可以调用 stuff().subscribeOn(Schedulers.parallel()).subscribe() 或其他调度程序以确保执行将在单独的线程中完成。

这里还有一个选项:

public class PeriodicReactiveTasksInSpring implements SmartLifecycle {

    private final AtomicReference<Subscription> subscription;
    private final Long executionPeriod;

    public PeriodicReactiveTasksInSpring(Long executionPeriod) {
        this.subscription = new AtomicReference<>();
        this.executionPeriod = executionPeriod;
    }

    @Override
    public void start() {
        if (Objects.isNull(subscription.get())) {
            updateConfig()
                    .doOnSubscribe(sub -> {
                        subscription.set(sub);
                    }).subscribe();
        }
    }

    @Override
    public void stop() {
        Optional.ofNullable(subscription.get())
                .ifPresent(sub -> {
                    sub.cancel();
                    subscription.set(null);
                });
    }

    @Override
    public boolean isRunning() {
        return Objects.nonNull(subscription.get());
    }


    private Flux<Item> updateConfig() {
        return Flux.interval(Duration.ofMillis(executionPeriod))
                .subscribeOn(Schedulers.boundedElastic())
                .flatMap(cfg -> {
                    // Do your job here
                })
                .onErrorContinue((err, msg) -> LOGGER.error("Error: {} message: {}", err, msg));
    }

}