反应堆项目:doOnNext(或其他 doOnXXX)异步
Project Reactor: doOnNext (or the others doOnXXX) async
有没有类似doOnNext但异步的方法?
例如,我需要为确定的元素做一些长时间的日志记录(通过电子邮件发送通知)。
Scheduler myParallel = Schedulers.newParallel("my-parallel", 4);
Flux<Integer> ints = Flux.just(1, 2, 3, 4, 5)
.publishOn(myParallel)
.doOnNext(v -> {
// For example, we need to do something time-consuming only for 3
if (v.equals(3)) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("LOG FOR " + v);
});
ints.subscribe(System.out::println);
但是我为什么要等待 3 的记录?我想异步执行此逻辑。
现在我只有这个解决方案
Thread.sleep(10000);
Scheduler myParallel = Schedulers.newParallel("my-parallel", 4);
Scheduler myParallel2 = Schedulers.newParallel("my-parallel2", 4);
Flux<Integer> ints = Flux.just(1, 2, 3, 4, 5)
.publishOn(myParallel)
.doOnNext(v -> {
Mono.just(v).publishOn(myParallel2).subscribe(value -> {
if (value.equals(3)) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("LOG FOR " + value);
});
});
ints.subscribe(System.out::println);
有什么"nice"解决方案吗?
如果您完全确定不关心电子邮件发送是否成功,那么您可以使用"subscribe-inside-doOnNext",但我非常有信心会出错。
为了让您的 Flux
在 "logging" 失败时传播 onError
信号,推荐的方法是使用 flatMap
.
好消息是,由于 flatMap
将内部发布者的结果立即合并到主序列中,您仍然可以立即发出每个元素并触发电子邮件。唯一需要注意的是,只有在电子邮件发送 Mono
也完成后,整个事情才会 完成 。您还可以在 flatMap
lambda 中检查是否需要进行日志记录(而不是在内部 Mono
中):
//assuming sendEmail returns a Mono<Void>, takes care of offsetting any blocking send onto another Scheduler
source //we assume elements are also publishOn as relevant in `source`
.flatMap(v -> {
//if we can decide right away wether or not to send email, better do it here
if (shouldSendEmailFor(v)) {
//we want to immediately re-emit the value, then trigger email and wait for it to complete
return Mono.just(v)
.concatWith(
//since Mono<Void> never emits onNext, it is ok to cast it to V
//which makes it compatible with concat, keeping the whole thing a Flux<V>
sendEmail(v).cast(V.class)
);
} else {
return Mono.just(v);
}
});
Flux<Integer> ints = Flux.just(1, 2, 3, 4, 5)
.flatMap(integer -> {
if (integer != 3) {
return Mono.just(integer)
.map(integer1 -> {
System.out.println(integer1);
return integer;
})
.subscribeOn(Schedulers.parallel());
} else {
return Mono.just(integer)
.delayElement(Duration.ofSeconds(3))
.map(integer1 -> {
System.out.println(integer1);
return integer;
})
.subscribeOn(Schedulers.parallel());
}
}
);
ints.subscribe();
有没有类似doOnNext但异步的方法? 例如,我需要为确定的元素做一些长时间的日志记录(通过电子邮件发送通知)。
Scheduler myParallel = Schedulers.newParallel("my-parallel", 4);
Flux<Integer> ints = Flux.just(1, 2, 3, 4, 5)
.publishOn(myParallel)
.doOnNext(v -> {
// For example, we need to do something time-consuming only for 3
if (v.equals(3)) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("LOG FOR " + v);
});
ints.subscribe(System.out::println);
但是我为什么要等待 3 的记录?我想异步执行此逻辑。
现在我只有这个解决方案
Thread.sleep(10000);
Scheduler myParallel = Schedulers.newParallel("my-parallel", 4);
Scheduler myParallel2 = Schedulers.newParallel("my-parallel2", 4);
Flux<Integer> ints = Flux.just(1, 2, 3, 4, 5)
.publishOn(myParallel)
.doOnNext(v -> {
Mono.just(v).publishOn(myParallel2).subscribe(value -> {
if (value.equals(3)) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("LOG FOR " + value);
});
});
ints.subscribe(System.out::println);
有什么"nice"解决方案吗?
如果您完全确定不关心电子邮件发送是否成功,那么您可以使用"subscribe-inside-doOnNext",但我非常有信心会出错。
为了让您的 Flux
在 "logging" 失败时传播 onError
信号,推荐的方法是使用 flatMap
.
好消息是,由于 flatMap
将内部发布者的结果立即合并到主序列中,您仍然可以立即发出每个元素并触发电子邮件。唯一需要注意的是,只有在电子邮件发送 Mono
也完成后,整个事情才会 完成 。您还可以在 flatMap
lambda 中检查是否需要进行日志记录(而不是在内部 Mono
中):
//assuming sendEmail returns a Mono<Void>, takes care of offsetting any blocking send onto another Scheduler
source //we assume elements are also publishOn as relevant in `source`
.flatMap(v -> {
//if we can decide right away wether or not to send email, better do it here
if (shouldSendEmailFor(v)) {
//we want to immediately re-emit the value, then trigger email and wait for it to complete
return Mono.just(v)
.concatWith(
//since Mono<Void> never emits onNext, it is ok to cast it to V
//which makes it compatible with concat, keeping the whole thing a Flux<V>
sendEmail(v).cast(V.class)
);
} else {
return Mono.just(v);
}
});
Flux<Integer> ints = Flux.just(1, 2, 3, 4, 5)
.flatMap(integer -> {
if (integer != 3) {
return Mono.just(integer)
.map(integer1 -> {
System.out.println(integer1);
return integer;
})
.subscribeOn(Schedulers.parallel());
} else {
return Mono.just(integer)
.delayElement(Duration.ofSeconds(3))
.map(integer1 -> {
System.out.println(integer1);
return integer;
})
.subscribeOn(Schedulers.parallel());
}
}
);
ints.subscribe();