如何从 quarkus 应用程序中正确地将逻辑删除消息发布到压缩的 kafka 主题?
How do I properly publish tombstone messages to compacted kafka topic from within a quarkus application?
来自 Quarkus application I need to publish tombstone messages to a compacted Apache Kafka topic. As my use-case is imperative I use an Emitter
for sending messages to the topic (as suggested in the quarkus blog)。 非逻辑删除 消息(带负载)的代码是:
@Dependent
public class Publisher {
@Inject
@Channel("theChannelName")
Emitter<MyDataStructure> emitter;
public CompletionStage<Void> publish(final MyDataStructure myData) {
OutgoingKafkaRecordMetadata<String> metadata =
OutgoingKafkaRecordMetadata.<String>builder()
.withKey(myData.getTopicKey())
.build();
return CompletableFuture.runAsync(
() -> emitter.send(Message.of(myData).addMetadata(metadata)));
}
}
Emitter
还实现了 <M extends Message<? extends T>> void send(M msg)
,我希望它能让我制作一个 Message
,负载为 null
作为墓碑消息。不幸的是,Message.of(..)
工厂方法的所有实现都允许提供元数据(提供消息密钥所需的元数据),指定 有效负载不得为 {@code null}.
使用 Emitter
将墓碑消息发布到 Kafka 主题的正确方法是什么(遵循 Quarkus / SmallRye Reactive Messaging 概念)?
我建议使用 Record
class(参见 documentation)。
一个Record
是一个key/value对,代表要写入的Kafka记录的key和value。两者都可以是 null
,但在您的情况下,只有值部分应该是 null
:Record.of(key, null);
.
所以,需要将Emitter的类型改为Record<Key, Value>
,如:
@Dependent
public class Publisher {
@Inject
@Channel("theChannelName")
Emitter<Record<Key, MyDataStructure>> emitter;
public CompletionStage<Void> publish(final MyDataStructure myData) {
return emitter.send(Record.of(myData.getTopicKey(), null);
}
}
虽然 runAsync
很方便,但发射器已经是异步的。所以,没必要用那个。此外,容器中的行为可能会很明显(如果您的并行度小于 2)。
我的代码返回了 send
方法的结果,即 CompletionStage
。当记录写入 Kafka(并被代理确认)时,该阶段将完成。
来自 Quarkus application I need to publish tombstone messages to a compacted Apache Kafka topic. As my use-case is imperative I use an Emitter
for sending messages to the topic (as suggested in the quarkus blog)。 非逻辑删除 消息(带负载)的代码是:
@Dependent
public class Publisher {
@Inject
@Channel("theChannelName")
Emitter<MyDataStructure> emitter;
public CompletionStage<Void> publish(final MyDataStructure myData) {
OutgoingKafkaRecordMetadata<String> metadata =
OutgoingKafkaRecordMetadata.<String>builder()
.withKey(myData.getTopicKey())
.build();
return CompletableFuture.runAsync(
() -> emitter.send(Message.of(myData).addMetadata(metadata)));
}
}
Emitter
还实现了 <M extends Message<? extends T>> void send(M msg)
,我希望它能让我制作一个 Message
,负载为 null
作为墓碑消息。不幸的是,Message.of(..)
工厂方法的所有实现都允许提供元数据(提供消息密钥所需的元数据),指定 有效负载不得为 {@code null}.
使用 Emitter
将墓碑消息发布到 Kafka 主题的正确方法是什么(遵循 Quarkus / SmallRye Reactive Messaging 概念)?
我建议使用 Record
class(参见 documentation)。
一个Record
是一个key/value对,代表要写入的Kafka记录的key和value。两者都可以是 null
,但在您的情况下,只有值部分应该是 null
:Record.of(key, null);
.
所以,需要将Emitter的类型改为Record<Key, Value>
,如:
@Dependent
public class Publisher {
@Inject
@Channel("theChannelName")
Emitter<Record<Key, MyDataStructure>> emitter;
public CompletionStage<Void> publish(final MyDataStructure myData) {
return emitter.send(Record.of(myData.getTopicKey(), null);
}
}
虽然 runAsync
很方便,但发射器已经是异步的。所以,没必要用那个。此外,容器中的行为可能会很明显(如果您的并行度小于 2)。
我的代码返回了 send
方法的结果,即 CompletionStage
。当记录写入 Kafka(并被代理确认)时,该阶段将完成。