Quarkus 中的 Kafka 背压

Kafka back pressure in Quarkus

我正在开发一个项目,需要从 GCP Pub/Sub 接收事件、下载文件、解析和处理文件,最后将结果发布到 Kafka 代理,但我得到 SRMSG00034:下游请求不足以发出项目

在我第一次尝试发布到 Kafka 时,我使用流迭代 msgList,但我读到 Mutiny (https://quarkus.io/blog/mutiny-back-pressure/) 可以控制背压,但我遇到了同样的错误。

在我的场景中,我必须发布两个不同的列表,其中一个有大约 10k 条消息。 我读到我可以使用 @OnOverflow 控制溢出配置,但我更愿意保持默认配置,除非有必要进行一些更改。

Multi.createFrom().iterable(msgList)
    .onItem().transform(item -> {
        ... some transformation ...
    })
    .onItem().invoke(emitter::send)
    .subscribe().with(
            item -> Uni.createFrom().voidItem(),
            Throwable::printStackTrace,
            () -> System.out.println("Done!")
    );

你能指出正确的方向来解决这个问题吗?

提前致谢

emitter.send 是一种异步方法。在您的代码中,它会立即忽略结果和 send returns。那可能不是你想要的。我建议使用 MutinyEmitter 并执行 onItem().call(emitter::send)。在这种情况下,您将等待消息发送。请注意,如果发送消息失败,它将传播失败。这样,将应用背压,因为只有在确认前一个消息后,它才会收到一条新消息。

如果要发送一批消息,请使用.group().asList().of(...),然后发送列表中的所有消息。但是,和以前一样,您需要等待所有消息的确认。 Uni.join 会让你这样做。

如果您想并发发送消息但并发性受限,请使用:

.onItem().transformToUni(m -> emitter.send(m)).merge(concurrency)

它将并发发送 concurrency 条消息。但是,在这种情况下不能保证顺序。