在 flatMap 之后具有同步延迟的进程列表 <String>?
Process List<String> with synchronous delay after flatMap?
嘿,我刚开始研究响应式编程,我不知道如何在 flatMap
之后以同步方式处理 List<String>
。
我想要达到的目标:
- 从外部服务获取域列表
- 过滤掉数据库中的现有域
- 向外部服务发出另一个 http 请求以获取域信息。这些调用 应该 以 同步 的方式执行,一个接一个地应用
Duration.ofSeconds(new Random().nextInt(5))
延迟,就像 Thread.sleep
而不是并行方式。
- 将新域数据存储到数据库中
client.fetchDomainList() // Flux<DomainListResponse>
.flatMap(response -> Flux.fromIterable(response.getDomainList()))
.filter(hostname -> ! domainRepository.existsByHostname(hostname))
.collectList()
// this next bit is sketchy.
// flatMap will doesn't work here (in my mind)
// because it will apply delay in parallel way
.map(list -> Flux.fromIterable(list)
.map(hostname -> client.fetchDomainInfo(hostname)
.delayElements(Duration.ofSeconds(new Random().nextInt(3))))
.map(domainInfoResponse -> {
return new Domain();
})
)
.flatMap(s -> { // s -> Flux<Domain> here. Should be simply Domain
// save into database?
})
您可能希望将该同步调用包装在一个 Mono.fromCallable
中,它会根据条件是否满足产生 0-1 个项目。
Mono<T> checkDomain(T domain) {
// consider adding (if appropriate) subscribeOn to switch to another scheduler suitable for this, such as parallel() perhaps
return Mono.fromCallable(() -> {
boolean filterMatches = ... your blocking HTTP request ...
return filterMatches;
}).flatMap(filterMatches -> filterMatches ? Mono.just(domain) : Mono.empty());
}
...
client.fetchDomainList() // Flux<DomainListResponse>
.flatMap(response -> Flux.fromIterable(response.getDomainList()))
// we are mapping 1 domain to 0-1 domain, depending on whether the condition is met.
.delayElements(/* your Duration */)
.flatMap(domain -> checkDomain(domain))
Flux.delayElements
的文档:
/**
* Delay each of this {@link Flux} elements ({@link Subscriber#onNext} signals)
* by a given {@link Duration}. Signals are delayed and continue on the
* {@link Schedulers#parallel() parallel} default Scheduler, but empty sequences or
* immediate error signals are not delayed.
*
* <p>
* <img class="marble" src="doc-files/marbles/delayElements.svg" alt="">
*
* @param delay duration by which to delay each {@link Subscriber#onNext} signal
* @return a delayed {@link Flux}
* @see #delaySubscription(Duration) delaySubscription to introduce a delay at the beginning of the sequence only
*/
public final Flux<T> delayElements(Duration delay) {
return delayElements(delay, Schedulers.parallel());
}
我做了一个小的 PoC 来亲自测试并提出以下内容
static Mono<Integer> verifyDomain(Integer t) {
return Mono.fromCallable(() -> {
Thread.sleep(100); // simulate HTTP request
return t % 2 == 0; // is even?
}).flatMap(condition -> condition ? Mono.just(t) : Mono.empty());
}
public static void main(String[] args) throws InterruptedException {
Flux.fromStream(Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.delayElements(Duration.ofSeconds(5))
.flatMap(i -> verifyDomain(i))
.subscribe(i -> System.out.println(LocalDateTime.now() + " - " + i));
Thread.sleep(100000); // just to stay alive
}
并且输出符合预期:
2021-10-19T16:58:28.644437500 - 2
2021-10-19T16:58:38.898397500 - 4
2021-10-19T16:58:49.130059 - 6
2021-10-19T16:58:59.376111100 - 8
2021-10-19T16:59:09.603982600 - 10
嘿,我刚开始研究响应式编程,我不知道如何在 flatMap
之后以同步方式处理 List<String>
。
我想要达到的目标:
- 从外部服务获取域列表
- 过滤掉数据库中的现有域
- 向外部服务发出另一个 http 请求以获取域信息。这些调用 应该 以 同步 的方式执行,一个接一个地应用
Duration.ofSeconds(new Random().nextInt(5))
延迟,就像Thread.sleep
而不是并行方式。 - 将新域数据存储到数据库中
client.fetchDomainList() // Flux<DomainListResponse>
.flatMap(response -> Flux.fromIterable(response.getDomainList()))
.filter(hostname -> ! domainRepository.existsByHostname(hostname))
.collectList()
// this next bit is sketchy.
// flatMap will doesn't work here (in my mind)
// because it will apply delay in parallel way
.map(list -> Flux.fromIterable(list)
.map(hostname -> client.fetchDomainInfo(hostname)
.delayElements(Duration.ofSeconds(new Random().nextInt(3))))
.map(domainInfoResponse -> {
return new Domain();
})
)
.flatMap(s -> { // s -> Flux<Domain> here. Should be simply Domain
// save into database?
})
您可能希望将该同步调用包装在一个 Mono.fromCallable
中,它会根据条件是否满足产生 0-1 个项目。
Mono<T> checkDomain(T domain) {
// consider adding (if appropriate) subscribeOn to switch to another scheduler suitable for this, such as parallel() perhaps
return Mono.fromCallable(() -> {
boolean filterMatches = ... your blocking HTTP request ...
return filterMatches;
}).flatMap(filterMatches -> filterMatches ? Mono.just(domain) : Mono.empty());
}
...
client.fetchDomainList() // Flux<DomainListResponse>
.flatMap(response -> Flux.fromIterable(response.getDomainList()))
// we are mapping 1 domain to 0-1 domain, depending on whether the condition is met.
.delayElements(/* your Duration */)
.flatMap(domain -> checkDomain(domain))
Flux.delayElements
的文档:
/**
* Delay each of this {@link Flux} elements ({@link Subscriber#onNext} signals)
* by a given {@link Duration}. Signals are delayed and continue on the
* {@link Schedulers#parallel() parallel} default Scheduler, but empty sequences or
* immediate error signals are not delayed.
*
* <p>
* <img class="marble" src="doc-files/marbles/delayElements.svg" alt="">
*
* @param delay duration by which to delay each {@link Subscriber#onNext} signal
* @return a delayed {@link Flux}
* @see #delaySubscription(Duration) delaySubscription to introduce a delay at the beginning of the sequence only
*/
public final Flux<T> delayElements(Duration delay) {
return delayElements(delay, Schedulers.parallel());
}
我做了一个小的 PoC 来亲自测试并提出以下内容
static Mono<Integer> verifyDomain(Integer t) {
return Mono.fromCallable(() -> {
Thread.sleep(100); // simulate HTTP request
return t % 2 == 0; // is even?
}).flatMap(condition -> condition ? Mono.just(t) : Mono.empty());
}
public static void main(String[] args) throws InterruptedException {
Flux.fromStream(Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.delayElements(Duration.ofSeconds(5))
.flatMap(i -> verifyDomain(i))
.subscribe(i -> System.out.println(LocalDateTime.now() + " - " + i));
Thread.sleep(100000); // just to stay alive
}
并且输出符合预期:
2021-10-19T16:58:28.644437500 - 2
2021-10-19T16:58:38.898397500 - 4
2021-10-19T16:58:49.130059 - 6
2021-10-19T16:58:59.376111100 - 8
2021-10-19T16:59:09.603982600 - 10