Quarkus/Smallrye 反应性 kafka - 端点 success/failure 来自消息的响应
Quarkus/Smallrye reactive kafka - Endpoint success/failure response from Message
我希望通过动态接受主题作为查询参数的 Success/Failure 响应来响应 REST 端点。在带有 smallrye 反应消息的 Quarkus 中,代码看起来像下面用 OutgoingKafkaRecordMetadata
包装有效负载
即https:///myendpoint/publishToKafka?topic=myDynamicTopic
@Channel("test")
Emitter<byte []> kafkaEmitter;
@POST
@Path("/publishToKafka")
public CompletionStage<Void> publishRecord(@QueryParam("topic") String topic, byte [] payload){
kafkaEmitter.send(Message.of(payload).addMetadata(OutgoingKafkaRecordMetadata.<String>builder()
.withKey("my-key")
.withTopic("myDynamicTopic")
.build()));
}
来自 Quarkus doco "If the endpoint does not return a CompletionStage, the HTTP response may be written before the message is sent to Kafka, and so failures won’t be reported to the user." The example 描述了当您直接发送有效载荷时的这个过程(即 emitter.send(payload) 其中 returns 一个 CompletionStage 但 emitter.send(message) returns void) 但是这个需要提前配置topic。是否可以使用 Message 指定元数据并仍然使用 success/failure 响应来响应调用客户端? (我不介意它是否与 Emitter 和 CompletionStage 或 MunityEmitter 和 Uni 一起使用)。
如有任何意见或建议,我们将不胜感激。
因为你使用了一个消息(因为你需要指定主题),所以你需要一些更复杂的东西:
@Channel("test")
Emitter<byte []> kafkaEmitter;
@POST
@Path("/publishToKafka")
public CompletionStage<Void> publishRecord(@QueryParam("topic") String topic, byte [] payload){
CompletableFuture<Void> future = new CompletableFuture<>();
Message<byte[]> message = Message.of(payload).addMetadata(OutgoingKafkaRecordMetadata.
<String>builder()
.withKey("my-key")
.withTopic("myDynamicTopic")
.build()));
message = message.withAck(() -> {
future.complete(null));
return CompleteableFuture.completedFuture(null);
}
.withNack(t -> {
future.completeExceptionnaly(t));
return CompleteableFuture.completedFuture(null);
});
kafkaEmitter.send(message);
return future;
}
在此代码段中,我还附加了消息被确认(被代理接受)或被拒绝(发生错误)时调用的 ack 和 nack 处理程序。
这些回调报告给 future
,一个在方法中创建的 CompletableFuture。这是 return 的对象,因为它会执行您想要的操作:指示结果。
我知道回调有点复杂。这主要是由于规范:我们必须 return CompleteableFuture.completedFuture(...);
确认 nack-process 成功。如果我们改为 return future;
(我们已将其设置为 future.completeExceptionnaly(t));
),这将被解释为 nack-process 期间的失败。这基本上等同于命令式世界中 catch
块中的 throw
。
幸运的是,一个更简单的版本将可用很快(不用担心,我们不会中断)。
我希望通过动态接受主题作为查询参数的 Success/Failure 响应来响应 REST 端点。在带有 smallrye 反应消息的 Quarkus 中,代码看起来像下面用 OutgoingKafkaRecordMetadata
包装有效负载即https:///myendpoint/publishToKafka?topic=myDynamicTopic
@Channel("test")
Emitter<byte []> kafkaEmitter;
@POST
@Path("/publishToKafka")
public CompletionStage<Void> publishRecord(@QueryParam("topic") String topic, byte [] payload){
kafkaEmitter.send(Message.of(payload).addMetadata(OutgoingKafkaRecordMetadata.<String>builder()
.withKey("my-key")
.withTopic("myDynamicTopic")
.build()));
}
来自 Quarkus doco "If the endpoint does not return a CompletionStage, the HTTP response may be written before the message is sent to Kafka, and so failures won’t be reported to the user." The example
如有任何意见或建议,我们将不胜感激。
因为你使用了一个消息(因为你需要指定主题),所以你需要一些更复杂的东西:
@Channel("test")
Emitter<byte []> kafkaEmitter;
@POST
@Path("/publishToKafka")
public CompletionStage<Void> publishRecord(@QueryParam("topic") String topic, byte [] payload){
CompletableFuture<Void> future = new CompletableFuture<>();
Message<byte[]> message = Message.of(payload).addMetadata(OutgoingKafkaRecordMetadata.
<String>builder()
.withKey("my-key")
.withTopic("myDynamicTopic")
.build()));
message = message.withAck(() -> {
future.complete(null));
return CompleteableFuture.completedFuture(null);
}
.withNack(t -> {
future.completeExceptionnaly(t));
return CompleteableFuture.completedFuture(null);
});
kafkaEmitter.send(message);
return future;
}
在此代码段中,我还附加了消息被确认(被代理接受)或被拒绝(发生错误)时调用的 ack 和 nack 处理程序。
这些回调报告给 future
,一个在方法中创建的 CompletableFuture。这是 return 的对象,因为它会执行您想要的操作:指示结果。
我知道回调有点复杂。这主要是由于规范:我们必须 return CompleteableFuture.completedFuture(...);
确认 nack-process 成功。如果我们改为 return future;
(我们已将其设置为 future.completeExceptionnaly(t));
),这将被解释为 nack-process 期间的失败。这基本上等同于命令式世界中 catch
块中的 throw
。
幸运的是,一个更简单的版本将可用很快(不用担心,我们不会中断)。