如何从 quarkus 应用程序中正确地将逻辑删除消息发布到压缩的 kafka 主题?

How do I properly publish tombstone messages to compacted kafka topic from within a quarkus application?

来自 Quarkus application I need to publish tombstone messages to a compacted Apache Kafka topic. As my use-case is imperative I use an Emitter for sending messages to the topic (as suggested in the quarkus blog)。 非逻辑删除 消息(带负载)的代码是:

@Dependent
public class Publisher {

  @Inject
  @Channel("theChannelName")
  Emitter<MyDataStructure> emitter;

  public CompletionStage<Void> publish(final MyDataStructure myData) {
    OutgoingKafkaRecordMetadata<String> metadata =
        OutgoingKafkaRecordMetadata.<String>builder()
            .withKey(myData.getTopicKey())
            .build();
    return CompletableFuture.runAsync(
        () -> emitter.send(Message.of(myData).addMetadata(metadata)));
  }
}

Emitter 还实现了 <M extends Message<? extends T>> void send(M msg),我希望它能让我制作一个 Message,负载为 null 作为墓碑消息。不幸的是,Message.of(..) 工厂方法的所有实现都允许提供元数据(提供消息密钥所需的元数据),指定 有效负载不得为 {@code null}.

使用 Emitter 将墓碑消息发布到 Kafka 主题的正确方法是什么(遵循 Quarkus / SmallRye Reactive Messaging 概念)?

我建议使用 Record class(参见 documentation)。 一个Record是一个key/value对,代表要写入的Kafka记录的key和value。两者都可以是 null,但在您的情况下,只有值部分应该是 nullRecord.of(key, null);.

所以,需要将Emitter的类型改为Record<Key, Value>,如:

@Dependent
public class Publisher {

  @Inject
  @Channel("theChannelName")
  Emitter<Record<Key, MyDataStructure>> emitter;

  public CompletionStage<Void> publish(final MyDataStructure myData) {
      return emitter.send(Record.of(myData.getTopicKey(), null);
  }
}

虽然 runAsync 很方便,但发射器已经是异步的。所以,没必要用那个。此外,容器中的行为可能会很明显(如果您的并行度小于 2)。

我的代码返回了 send 方法的结果,即 CompletionStage。当记录写入 Kafka(并被代理确认)时,该阶段将完成。