如何在 Quarkus 中注入 KafkaTemplate

How to inject KafkaTemplate in Quarkus

我正在尝试注入一个 KafkaTemplate 来发送一条消息。我正在开发一个位于反应式方法之外的小函数。

我只能从 Smallrye 中找到使用 @Ingoing@Outgoing 的示例,但我不需要 KafkaStream

我尝试使用 Kafka-CDI,但无法注入 SimpleKafkaProducer

有什么想法吗?

Clement 的回答

这似乎是正确的方向,但是执行 orders.send("hello"); 我收到这个错误:

(vert.x-eventloop-thread-3) Unhandled exception:java.lang.IllegalStateException: Stream not yet connected

我正在通过命令行使用我的主题,Kafka 已启动并且 运行,如果我手动生成,我可以看到使用的消息。

似乎与文档的这句话有关:

To use an Emitter for the stream hello, you need a @Incoming("hello") somewhere in your code (or in your configuration).

我的 class 中有此代码:

    @Incoming("orders")
    public CompletionStage<Void> consume(KafkaMessage<String, String> msg) {
        log.info("Received message (topic: {}, partition: {}) with key {}: {}", msg.getTopic(), msg.getPartition(), msg.getKey(), msg.getPayload());
        return msg.ack();
    }

也许我忘记了一些配置?

因此,您只需要使用 Emitter:

@Inject
@Stream("orders") // Emit on the channel 'orders'
Emitter<String> orders;

// ...
orders.send("hello");

并在您的 application.properties 中声明:

## Orders topic (WRITE)
mp.messaging.outgoing.orders.type=io.smallrye.reactive.messaging.kafka.Kafka
mp.messaging.outgoing.orders.topic=orders
mp.messaging.outgoing.orders.bootstrap.servers=localhost:9092
mp.messaging.outgoing.orders.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.orders.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.orders.acks=1

为了避免 Stream not yet connected 异常,如 doc 所建议:

To use an Emitter for the stream hello, you need a @Incoming("hello") somewhere in your code (or in your configuration).

假设您的 application.properties 中有这样的内容:

# Orders topic (READ)
smallrye.messaging.source.orders-r-topic.type=io.smallrye.reactive.messaging.kafka.Kafka
smallrye.messaging.source.orders-r-topic.topic=orders
smallrye.messaging.source.orders-r-topic.bootstrap.servers=0.0.0.0:9092
smallrye.messaging.source.orders-r-topic.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
smallrye.messaging.source.orders-r-topic.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
smallrye.messaging.source.orders-r-topic.group.id=my-group-id

添加如下内容:

@Incoming("orders-r-topic")
public CompletionStage<Void> consume(KafkaMessage<String, String> msg) {
    log.info("Received message (topic: {}, partition: {}) with key {}: {}", msg.getTopic(), msg.getPartition(), msg.getKey(), msg.getPayload());
    return msg.ack();
}

Since the @Stream annotation has been deprecated. The @Channel annotation must be used instead.

您可以使用 quarkus-smallrye-reactive-messaging-kafka 依赖项提供的 Emitter 向 Kafka 主题生成消息。

一个简单的 Kafka 生产者实现:

public class MyKafkaProducer {

    @Inject
    @Channel("my-topic")
    Emitter<String> myEmitter;

    public void produce(String message) {
      myEmitter.send(message);
    }
}

并且必须将以下配置添加到 application.properties 文件中:

mp.messaging.outgoing.my-topic.connector=smallrye-kafka
mp.messaging.outgoing.my-topic.bootstrap.servers=localhost:9092
mp.messaging.outgoing.my-topic.value.serializer=org.apache.kafka.common.serialization.StringSerializer

这将向名为 my-topic.

的 kafka 主题生成字符串序列化消息

请注意,默认情况下,通道的名称也是将在其中生成数据的 kafka 主题的名称。可以通过配置更改此行为。受支持的配置属性在 reactive Messaging documentation

中有描述