Flux - 带有 webclient 的并行 flatMap - 限制为固定的批处理速率
Flux - parallel flatMap with webclient - limit to fixed batched rate
我的代码是这样的:
return Flux.fromIterable(new Generator()).log()
.flatMap(
s ->
webClient
.head()
.uri(
MessageFormat.format(
"/my-{2,number,#00}.xml",
channel, timestamp, s))
.exchangeToMono(r -> Mono.just(r.statusCode()))
.filter(HttpStatus::is2xxSuccessful)
.map(r -> s),
6) //only request 6 segments in parallel via webClient
.take(6) //we need only 6 200 OK responses
.sort();
它只请求 HEAD
,直到前 6 个请求成功。
并行化在这里工作,但问题是在 1 个请求完成后,它会立即触发下一个请求(以保持并行化级别为 6)。我在这里需要的是并行化级别为 6,但要分批进行。所以 - 触发 6 个请求,等待全部完成,再次触发 6 个请求 ...
这是上面log()
的输出:
: | request(6)
: | onNext(7)
: | onNext(17)
: | onNext(27)
: | onNext(37)
: | onNext(47)
: | onNext(57)
: | request(1) <---- from here NOT OK; wait until all complete and request(6)
: | onNext(8)
: | request(1)
: | onNext(18)
: | request(1)
: | onNext(28)
: | request(1)
: | onNext(38)
: | request(1)
: | onNext(48)
: | request(1)
: | onNext(58)
: | cancel()
更新
这是我对缓冲区的尝试:
return Flux.fromIterable(new Generator())
.buffer(6)
.flatMap(Flux::fromIterable)
.log()
.flatMap(
s ->
webClient
.head()
.uri(
MessageFormat.format(
"/my-{2,number,#00}.xml",
channel, timestamp, s))
.exchangeToMono(r -> Mono.just(r.statusCode()))
.filter(HttpStatus::is2xxSuccessful)
.map(r -> s),
6) //only request 6 segments in parallel via webClient
.take(6)
.sort();
好的,看来我有有效的代码。这里我使用 window
:
return Flux.fromIterable(new Generator())
.window(6) //group 1,2,3,4,5,6,7... into [0,1,2,3,4,5],[6,7..,11],[12,..,17]
.log()
.flatMap(
s -> s.log().flatMap(x -> webClient
.head()
.uri(
MessageFormat.format(
"/my-{2,number,#00}.xml",
channel, timestamp, x))
.exchangeToMono(r -> Mono.just(r.statusCode()))
.filter(HttpStatus::is2xxSuccessful)
.map(r -> x), 6), 1) //1 means take only 1 array ([0,1,2,3,4,5]). 6 means take in parallel all from array (0,1,2,3,4,5)
.take(6, true) //pass through only 6 elements (cancel afterwards)
.sort();
我的代码是这样的:
return Flux.fromIterable(new Generator()).log()
.flatMap(
s ->
webClient
.head()
.uri(
MessageFormat.format(
"/my-{2,number,#00}.xml",
channel, timestamp, s))
.exchangeToMono(r -> Mono.just(r.statusCode()))
.filter(HttpStatus::is2xxSuccessful)
.map(r -> s),
6) //only request 6 segments in parallel via webClient
.take(6) //we need only 6 200 OK responses
.sort();
它只请求 HEAD
,直到前 6 个请求成功。
并行化在这里工作,但问题是在 1 个请求完成后,它会立即触发下一个请求(以保持并行化级别为 6)。我在这里需要的是并行化级别为 6,但要分批进行。所以 - 触发 6 个请求,等待全部完成,再次触发 6 个请求 ...
这是上面log()
的输出:
: | request(6)
: | onNext(7)
: | onNext(17)
: | onNext(27)
: | onNext(37)
: | onNext(47)
: | onNext(57)
: | request(1) <---- from here NOT OK; wait until all complete and request(6)
: | onNext(8)
: | request(1)
: | onNext(18)
: | request(1)
: | onNext(28)
: | request(1)
: | onNext(38)
: | request(1)
: | onNext(48)
: | request(1)
: | onNext(58)
: | cancel()
更新
这是我对缓冲区的尝试:
return Flux.fromIterable(new Generator())
.buffer(6)
.flatMap(Flux::fromIterable)
.log()
.flatMap(
s ->
webClient
.head()
.uri(
MessageFormat.format(
"/my-{2,number,#00}.xml",
channel, timestamp, s))
.exchangeToMono(r -> Mono.just(r.statusCode()))
.filter(HttpStatus::is2xxSuccessful)
.map(r -> s),
6) //only request 6 segments in parallel via webClient
.take(6)
.sort();
好的,看来我有有效的代码。这里我使用 window
:
return Flux.fromIterable(new Generator())
.window(6) //group 1,2,3,4,5,6,7... into [0,1,2,3,4,5],[6,7..,11],[12,..,17]
.log()
.flatMap(
s -> s.log().flatMap(x -> webClient
.head()
.uri(
MessageFormat.format(
"/my-{2,number,#00}.xml",
channel, timestamp, x))
.exchangeToMono(r -> Mono.just(r.statusCode()))
.filter(HttpStatus::is2xxSuccessful)
.map(r -> x), 6), 1) //1 means take only 1 array ([0,1,2,3,4,5]). 6 means take in parallel all from array (0,1,2,3,4,5)
.take(6, true) //pass through only 6 elements (cancel afterwards)
.sort();