在 flatMap 之后具有同步延迟的进程列表 <String>?

Process List<String> with synchronous delay after flatMap?

嘿,我刚开始研究响应式编程,我不知道如何在 flatMap 之后以同步方式处理 List<String>

我想要达到的目标:

  1. 从外部服务获取域列表
  2. 过滤掉数据库中的现有域
  3. 向外部服务发出另一个 http 请求以获取域信息。这些调用 应该 同步 的方式执行,一个接一个地应用 Duration.ofSeconds(new Random().nextInt(5)) 延迟,就像 Thread.sleep 而不是并行方式。
  4. 将新域数据存储到数据库中
client.fetchDomainList() // Flux<DomainListResponse>
.flatMap(response -> Flux.fromIterable(response.getDomainList()))
.filter(hostname -> ! domainRepository.existsByHostname(hostname))
.collectList()

// this next bit is sketchy. 
// flatMap will doesn't work here (in my mind) 
// because it will apply delay in parallel way
.map(list -> Flux.fromIterable(list)
    .map(hostname -> client.fetchDomainInfo(hostname)
        .delayElements(Duration.ofSeconds(new Random().nextInt(3))))
    .map(domainInfoResponse -> {
        return new Domain();
    })
)

.flatMap(s -> { // s -> Flux<Domain> here. Should be simply Domain
 // save into database?
})

您可能希望将该同步调用包装在一个 Mono.fromCallable 中,它会根据条件是否满足产生 0-1 个项目。

Mono<T> checkDomain(T domain) {
   // consider adding (if appropriate) subscribeOn to switch to another scheduler suitable for this, such as parallel() perhaps
   return Mono.fromCallable(() -> {
      boolean filterMatches = ... your blocking HTTP request ...
      return filterMatches;
   }).flatMap(filterMatches -> filterMatches ? Mono.just(domain) : Mono.empty());
}
...

client.fetchDomainList() // Flux<DomainListResponse>
    .flatMap(response -> Flux.fromIterable(response.getDomainList()))
    // we are mapping 1 domain to 0-1 domain, depending on whether the condition is met.
    .delayElements(/* your Duration */)
    .flatMap(domain -> checkDomain(domain)) 

Flux.delayElements 的文档:

/**
 * Delay each of this {@link Flux} elements ({@link Subscriber#onNext} signals)
 * by a given {@link Duration}. Signals are delayed and continue on the
 * {@link Schedulers#parallel() parallel} default Scheduler, but empty sequences or
 * immediate error signals are not delayed.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/delayElements.svg" alt="">
 *
 * @param delay duration by which to delay each {@link Subscriber#onNext} signal
 * @return a delayed {@link Flux}
 * @see #delaySubscription(Duration) delaySubscription to introduce a delay at the beginning of the sequence only
 */
public final Flux<T> delayElements(Duration delay) {
    return delayElements(delay, Schedulers.parallel());
}

我做了一个小的 PoC 来亲自测试并提出以下内容

    static Mono<Integer> verifyDomain(Integer t) {
        return Mono.fromCallable(() -> {
            Thread.sleep(100); // simulate HTTP request
            return t % 2 == 0; // is even?
        }).flatMap(condition -> condition ? Mono.just(t) : Mono.empty());
    }

    public static void main(String[] args) throws InterruptedException {
        Flux.fromStream(Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
                .delayElements(Duration.ofSeconds(5))
                .flatMap(i -> verifyDomain(i))
                .subscribe(i -> System.out.println(LocalDateTime.now() + " - " + i));

        Thread.sleep(100000); // just to stay alive
    }

并且输出符合预期:

2021-10-19T16:58:28.644437500 - 2
2021-10-19T16:58:38.898397500 - 4
2021-10-19T16:58:49.130059 - 6
2021-10-19T16:58:59.376111100 - 8
2021-10-19T16:59:09.603982600 - 10