安排 运行 每 1 分钟一个反应流
Schedule running a reactive stream for every 1 min
我有一个反应流,它获取一些数据,遍历数据,处理数据,最后将数据写入 Kafka
public Flux<M> sendData(){
Flux.fromIterable(o.getC()).publishOn(Schedulers.boundedElastic())
.flatMap(id->
Flux.fromIterable(getM(id)).publishOn(Schedulers.boundedElastic())
.flatMap( n -> {
return Flux.fromIterable(o.getD()).publishOn(Schedulers.boundedElastic())
.flatMap(d -> return Flux.just(sendToKafka));
})
)
.doOnError(throwable -> {
log.debug("Error while reading data : {} ", throwable.getMessage());
return;
})
.subscribe();
}
public void run(String... args){
sendData();
}
我希望此工作流程每分钟 运行。有人可以帮助我了解如何在流中安排这个吗?
如果你想每分钟 运行 做一些事情,你可以这样做。
Flux.interval(Duration.ofMinutes(1))
.onBackpressureDrop()
.flatMap(n -> sendData())
.subscribeOn(Schedulers.boundedElastic())
.subscribe()
我有一个反应流,它获取一些数据,遍历数据,处理数据,最后将数据写入 Kafka
public Flux<M> sendData(){
Flux.fromIterable(o.getC()).publishOn(Schedulers.boundedElastic())
.flatMap(id->
Flux.fromIterable(getM(id)).publishOn(Schedulers.boundedElastic())
.flatMap( n -> {
return Flux.fromIterable(o.getD()).publishOn(Schedulers.boundedElastic())
.flatMap(d -> return Flux.just(sendToKafka));
})
)
.doOnError(throwable -> {
log.debug("Error while reading data : {} ", throwable.getMessage());
return;
})
.subscribe();
}
public void run(String... args){
sendData();
}
我希望此工作流程每分钟 运行。有人可以帮助我了解如何在流中安排这个吗?
如果你想每分钟 运行 做一些事情,你可以这样做。
Flux.interval(Duration.ofMinutes(1))
.onBackpressureDrop()
.flatMap(n -> sendData())
.subscribeOn(Schedulers.boundedElastic())
.subscribe()