如何仅从反应流中发出累计和?

How to emit cumulative sum only from a reactive stream?

我有一个用例,其中流应该仅在累积“总和”等于或超过给定值 n 时发出。让我们以 n = 5 的六个整数为例。

+---+------+---------+
| i | Emit |   Sum   |
+---+------+---------+
| 1 |    - | 1       |
| 2 |    - | 3       |
| 3 |    5 | 1       |
| 4 |    5 | 0       |
| 5 |    5 | 0       |
| 2 |    2 | 0 (end) |
+---+------+---------+

如您所见,除非总和等于或超过 5,否则不会发出任何内容,但无论如何都会发出最后一个元素。

一旦发出一个项目,总和就会减少该值 (n)。实际上,我正在从网络调用中读取数据,然后将它们发送给只接受固定大小块的下游消费者,当然最后一个除外(上游已完成)。

我正在使用 Reactor 项目 Flux 作为 Publisher;我找不到任何方法可以让我做上面显示的事情。 scan 最接近,但它也会发出需要过滤掉的中间元素。

这不可能直接在 Flux 对象上执行,但如果您有权访问创建 Flux 对象的资源,则可能会实现解决方案。由于在流 (Flux) 内部,您无法访问前一个元素,您可以在索引上为您的资源创建 Flux,并直接从索引的 Flux 访问此资源(因为它是只读操作)。例如这样的事情:

List<Integer> list = List.of(1, 2, 3, 4, 5, 2);
AtomicReference<Integer> atomicSum = new AtomicReference<>(0);
return Flux.fromStream(IntStream.range(0, list.size() - 1).boxed())
        .flatMap(i -> {
            int sum = atomicSum.updateAndGet((integer -> integer + list.get(i)));
            if (sum >= 5) {
                atomicSum.updateAndGet(integer -> integer - 5);
                return Flux.just(5);
            }

            return (i.equals(list.size() -1))
                    ? Flux.just(list.get(i)) // emit last element even if sum was not 5
                    : Flux.empty();
        }); // emitted element's

请注意,这不是一个好的做法,我不建议这样的解决方案。 Flux 对象处理可能会在线程之间跳过,因此如果您在 Flux 之外修改对象,您应该以同步方式进行(因此使用 AtomicReference)。列表仅用于只读操作,因此没问题。我也不知道这部分代码是否真的有效,但我想向您展示如果您可以访问创建 Flux 对象的资源,您将如何找到解决方案。

编辑:即使这样的解决方案也行不通。我弄错了 myslef,Flux 对象不会在线程之间跳过,但可能会被多个线程处理,从而导致对无效状态的单个原子引用。这种云仍然可以通过一些同步机制来解决,比如锁而不是原子引用,但这远远超出了普通开发人员的经验。您确定不能使用 scan() 函数,因为您可以提供自己的累加器函数作为参数吗?

In reality, I'm reading data from a network call, and subsequently sending them to a downstream consumer who only accepts fixed size chunks, except for the last one, of course (upstream completed).

我突然想到,我自己尝试拆分响应 Flux 可能有点晚,而且相当困难;相反,我可以使用像 Netty FixedLengthFrameDecoder 这样的东西,它完全符合我的要求。

这让我找到了 reactor-netty 源代码,经过大量挖掘,我找到了我需要的东西。

fun get(url: String, maxChunkSize: Int): List<ByteArray> {
    return HttpClient.create()
        .httpResponseDecoder { it.maxChunkSize(maxChunkSize) }
        .get()
        .uri(url)
        .responseContent()
        .asByteArray()
        .collectList()
        .block()!!
}

关键的部分是httpResponseDecoder { it.maxChunkSize(maxChunkSize) };单元测试证明这是有效的:

@Test

fun testHonorsMaxChunkSize() {
    val maxChunkSize = 4096
    val chunks = FixedLengthResponseFrameClient.get(
        "http://doesnotexist.nowhere/binary", maxChunkSize
    )

    assertThat(chunks.subList(0, chunks.size - 1))
        .allMatch { it.size ==  maxChunkSize}
    assertThat(chunks.last().size).isLessThanOrEqualTo(maxChunkSize)
}

WebClient可以配置自定义HttpClient(配置为httpResponseDecoder)如下图:

WebClient
  .builder()
  .clientConnector(ReactorClientHttpConnector(httpClient))
  .build()
  .get()
  .uri("uri")
  .exchange()
  .flatMapMany { it.body(BodyExtractors.toDataBuffers()) }
  ...

这些缓冲区的大小将是 HttpClient.httpResponseDecoder 中设置的大小(默认为 8192 Kb)。