Quarkus - 基于 kafka 写 ack 和 nack 响应状态
Quarkus - Responding with status based on kafka write ack and nack
我有一个将数据推送到 kafka 的端点。现在,我想在 kafka 写入成功或失败的情况下分别使用适当的状态代码 2xx 或 5xx 响应调用。代码片段是
@Path("/prices")
public class PriceResource {
@Inject @Channel("price-create") Emitter<Double> priceEmitter;
@POST
@Consumes(MediaType.TEXT_PLAIN)
public void addPrice(Double price) {
priceEmitter.send(Message.of(price)
.withAck(() -> {
// Called when the message is acked
return CompletableFuture.completedFuture(null);
})
.withNack(throwable -> {
// Called when the message is nacked
return CompletableFuture.completedFuture(null);
}));
// return appropriate response
return Response
}
}
现在的问题是端点在执行 ack 或 nack 回调之前使用状态代码进行响应。
还尝试了 MutinyEmitter
的 sendAndAwait
方法,但该方法 returns 无效。所以没有办法知道消息是acked还是nacked。
此处最好的方法是链接异步操作,如下所示:
@POST
@Consumes(MediaType.TEXT_PLAIN)
public Uni<Response> addPrice(Double price) {
return Uni.createFrom().completionStage(priceEmitter.send(price))
.onItem().transform(ignored -> Response.ok().entity("foo").build())
.onFailure().recoverWithItem(Response.serverError().build());
}
如果你想使用同步代码(我不推荐):
@Blocking
@POST
@Consumes(MediaType.TEXT_PLAIN)
public Response addPrice(Double price) {
try {
Uni.createFrom().completionStage(priceEmitter.send(price))
.await().indefinitely();
return Response.ok().entity("foo").build();
} catch (Exception e) {
return Response.serverError().build();
}
}
如果 Uni 发出故障,.await().indefinitely() 将抛出异常。
您还可以选择直接使用发射器返回的 CompletionStage 而无需将其转换为 Uni,但请记住 Quarkus 选择 Mutiny 作为其默认的反应框架。
我有一个将数据推送到 kafka 的端点。现在,我想在 kafka 写入成功或失败的情况下分别使用适当的状态代码 2xx 或 5xx 响应调用。代码片段是
@Path("/prices")
public class PriceResource {
@Inject @Channel("price-create") Emitter<Double> priceEmitter;
@POST
@Consumes(MediaType.TEXT_PLAIN)
public void addPrice(Double price) {
priceEmitter.send(Message.of(price)
.withAck(() -> {
// Called when the message is acked
return CompletableFuture.completedFuture(null);
})
.withNack(throwable -> {
// Called when the message is nacked
return CompletableFuture.completedFuture(null);
}));
// return appropriate response
return Response
}
}
现在的问题是端点在执行 ack 或 nack 回调之前使用状态代码进行响应。
还尝试了 MutinyEmitter
的 sendAndAwait
方法,但该方法 returns 无效。所以没有办法知道消息是acked还是nacked。
此处最好的方法是链接异步操作,如下所示:
@POST
@Consumes(MediaType.TEXT_PLAIN)
public Uni<Response> addPrice(Double price) {
return Uni.createFrom().completionStage(priceEmitter.send(price))
.onItem().transform(ignored -> Response.ok().entity("foo").build())
.onFailure().recoverWithItem(Response.serverError().build());
}
如果你想使用同步代码(我不推荐):
@Blocking
@POST
@Consumes(MediaType.TEXT_PLAIN)
public Response addPrice(Double price) {
try {
Uni.createFrom().completionStage(priceEmitter.send(price))
.await().indefinitely();
return Response.ok().entity("foo").build();
} catch (Exception e) {
return Response.serverError().build();
}
}
如果 Uni 发出故障,.await().indefinitely() 将抛出异常。
您还可以选择直接使用发射器返回的 CompletionStage 而无需将其转换为 Uni,但请记住 Quarkus 选择 Mutiny 作为其默认的反应框架。