Quarkus - 基于 kafka 写 ack 和 nack 响应状态

Quarkus - Responding with status based on kafka write ack and nack

我有一个将数据推送到 kafka 的端点。现在,我想在 kafka 写入成功或失败的情况下分别使用适当的状态代码 2xx 或 5xx 响应调用。代码片段是

@Path("/prices")
public class PriceResource {

    @Inject @Channel("price-create") Emitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public void addPrice(Double price) {
        priceEmitter.send(Message.of(price)
            .withAck(() -> {
                // Called when the message is acked
                return CompletableFuture.completedFuture(null);
            })
            .withNack(throwable -> {
                // Called when the message is nacked
                return CompletableFuture.completedFuture(null);
            }));
         // return appropriate response
          return Response
    }
}

现在的问题是端点在执行 ack 或 nack 回调之前使用状态代码进行响应。 还尝试了 MutinyEmittersendAndAwait 方法,但该方法 returns 无效。所以没有办法知道消息是acked还是nacked。

此处最好的方法是链接异步操作,如下所示:

@POST
@Consumes(MediaType.TEXT_PLAIN)
public Uni<Response> addPrice(Double price) {
    return Uni.createFrom().completionStage(priceEmitter.send(price))
            .onItem().transform(ignored -> Response.ok().entity("foo").build())
            .onFailure().recoverWithItem(Response.serverError().build());
}

如果你想使用同步代码(我不推荐):

@Blocking
@POST
@Consumes(MediaType.TEXT_PLAIN)
public Response addPrice(Double price) {
    try {
        Uni.createFrom().completionStage(priceEmitter.send(price))
                .await().indefinitely();

        return Response.ok().entity("foo").build();
    } catch (Exception e) {
        return Response.serverError().build();
    }
}

如果 Uni 发出故障,.await().indefinitely() 将抛出异常。

您还可以选择直接使用发射器返回的 CompletionStage 而无需将其转换为 Uni,但请记住 Quarkus 选择 Mutiny 作为其默认的反应框架。