Quarkus 中的 Kafka 背压
Kafka back pressure in Quarkus
我正在开发一个项目,需要从 GCP Pub/Sub 接收事件、下载文件、解析和处理文件,最后将结果发布到 Kafka 代理,但我得到 SRMSG00034:下游请求不足以发出项目。
在我第一次尝试发布到 Kafka 时,我使用流迭代 msgList,但我读到 Mutiny (https://quarkus.io/blog/mutiny-back-pressure/) 可以控制背压,但我遇到了同样的错误。
在我的场景中,我必须发布两个不同的列表,其中一个有大约 10k 条消息。
我读到我可以使用 @OnOverflow 控制溢出配置,但我更愿意保持默认配置,除非有必要进行一些更改。
Multi.createFrom().iterable(msgList)
.onItem().transform(item -> {
... some transformation ...
})
.onItem().invoke(emitter::send)
.subscribe().with(
item -> Uni.createFrom().voidItem(),
Throwable::printStackTrace,
() -> System.out.println("Done!")
);
你能指出正确的方向来解决这个问题吗?
提前致谢
emitter.send
是一种异步方法。在您的代码中,它会立即忽略结果和 send
returns。那可能不是你想要的。我建议使用 MutinyEmitter
并执行 onItem().call(emitter::send)
。在这种情况下,您将等待消息发送。请注意,如果发送消息失败,它将传播失败。这样,将应用背压,因为只有在确认前一个消息后,它才会收到一条新消息。
如果要发送一批消息,请使用.group().asList().of(...)
,然后发送列表中的所有消息。但是,和以前一样,您需要等待所有消息的确认。 Uni.join
会让你这样做。
如果您想并发发送消息但并发性受限,请使用:
.onItem().transformToUni(m -> emitter.send(m)).merge(concurrency)
它将并发发送 concurrency
条消息。但是,在这种情况下不能保证顺序。
我正在开发一个项目,需要从 GCP Pub/Sub 接收事件、下载文件、解析和处理文件,最后将结果发布到 Kafka 代理,但我得到 SRMSG00034:下游请求不足以发出项目。
在我第一次尝试发布到 Kafka 时,我使用流迭代 msgList,但我读到 Mutiny (https://quarkus.io/blog/mutiny-back-pressure/) 可以控制背压,但我遇到了同样的错误。
在我的场景中,我必须发布两个不同的列表,其中一个有大约 10k 条消息。 我读到我可以使用 @OnOverflow 控制溢出配置,但我更愿意保持默认配置,除非有必要进行一些更改。
Multi.createFrom().iterable(msgList)
.onItem().transform(item -> {
... some transformation ...
})
.onItem().invoke(emitter::send)
.subscribe().with(
item -> Uni.createFrom().voidItem(),
Throwable::printStackTrace,
() -> System.out.println("Done!")
);
你能指出正确的方向来解决这个问题吗?
提前致谢
emitter.send
是一种异步方法。在您的代码中,它会立即忽略结果和 send
returns。那可能不是你想要的。我建议使用 MutinyEmitter
并执行 onItem().call(emitter::send)
。在这种情况下,您将等待消息发送。请注意,如果发送消息失败,它将传播失败。这样,将应用背压,因为只有在确认前一个消息后,它才会收到一条新消息。
如果要发送一批消息,请使用.group().asList().of(...)
,然后发送列表中的所有消息。但是,和以前一样,您需要等待所有消息的确认。 Uni.join
会让你这样做。
如果您想并发发送消息但并发性受限,请使用:
.onItem().transformToUni(m -> emitter.send(m)).merge(concurrency)
它将并发发送 concurrency
条消息。但是,在这种情况下不能保证顺序。