Quarkus/Smallrye 反应性 kafka - 端点 success/failure 来自消息的响应

Quarkus/Smallrye reactive kafka - Endpoint success/failure response from Message

我希望通过动态接受主题作为查询参数的 Success/Failure 响应来响应 REST 端点。在带有 smallrye 反应消息的 Quarkus 中,代码看起来像下面用 OutgoingKafkaRecordMetadata

包装有效负载

即https:///myendpoint/publishToKafka?topic=myDynamicTopic

@Channel("test")
Emitter<byte []> kafkaEmitter;

@POST
@Path("/publishToKafka")
public CompletionStage<Void> publishRecord(@QueryParam("topic") String topic, byte [] payload){

    kafkaEmitter.send(Message.of(payload).addMetadata(OutgoingKafkaRecordMetadata.<String>builder()
            .withKey("my-key")
            .withTopic("myDynamicTopic")
            .build()));
    
}

来自 Quarkus doco "If the endpoint does not return a CompletionStage, the HTTP response may be written before the message is sent to Kafka, and so failures won’t be reported to the user." The example 描述了当您直接发送有效载荷时的这个过程(即 emitter.send(payload) 其中 returns 一个 CompletionStage 但 emitter.send(message) returns void) 但是这个需要提前配置topic。是否可以使用 Message 指定元数据并仍然使用 success/failure 响应来响应调用客户端? (我不介意它是否与 Emitter 和 CompletionStage 或 MunityEmitter 和 Uni 一起使用)。

如有任何意见或建议,我们将不胜感激。

因为你使用了一个消息(因为你需要指定主题),所以你需要一些更复杂的东西:

@Channel("test")
Emitter<byte []> kafkaEmitter;

@POST
@Path("/publishToKafka")
public CompletionStage<Void> publishRecord(@QueryParam("topic") String topic, byte [] payload){
    CompletableFuture<Void> future = new CompletableFuture<>();
    Message<byte[]> message = Message.of(payload).addMetadata(OutgoingKafkaRecordMetadata. 
           <String>builder()
            .withKey("my-key")
            .withTopic("myDynamicTopic")
            .build()));
    message = message.withAck(() -> {
         future.complete(null));
         return CompleteableFuture.completedFuture(null);
    }
     .withNack(t -> {
       future.completeExceptionnaly(t));
       return CompleteableFuture.completedFuture(null);
    });
    kafkaEmitter.send(message);
    return future;    
}

在此代码段中,我还附加了消息被确认(被代理接受)或被拒绝(发生错误)时调用的 ack 和 nack 处理程序。

这些回调报告给 future,一个在方法中创建的 CompletableFuture。这是 return 的对象,因为它会执行您想要的操作:指示结果。

我知道回调有点复杂。这主要是由于规范:我们必须 return CompleteableFuture.completedFuture(...); 确认 nack-process 成功。如果我们改为 return future;(我们已将其设置为 future.completeExceptionnaly(t));),这将被解释为 nack-process 期间的失败。这基本上等同于命令式世界中 catch 块中的 throw

幸运的是,一个更简单的版本将可用很快(不用担心,我们不会中断)。