Quarkus中是否有向Kafka发送消息的功能
Is there any function in Quarkus to send message to Kafka
我是 kafka 和 quarkus 的新手,我想在处理完用户请求后向 kafka 主题发送消息。
我已经完成了 quarkus-quickstart 中提供的 kafka 示例。我试过 KafkaMessage
// when GET called send message to topic
@GET
@Produces(MediaType.TEXT_PLAIN)
public String hello() {
generateSingle();
return "hello";
}
@Outgoing("single-stations")
public KafkaMessage<Integer, String> generateSingle() {
return KafkaMessage.of(1, "value");
};
但是我得到的结果是不断向kafka主题发送消息。
我想知道有没有其他方法或者我的代码有什么问题。
感谢帮助
此时有关此主题的文档简洁且不完整 (Quarkus 0.25.0)。我设法做到了,但它进行了大量实验,我认为这是一个 hack,希望在以后的 Quarkus 版本中得到补救。
原则是@Outgoing
方法必须产生一个stream被外部控制。这是通过在 @PostConstruct
方法中通过 Flowable.create()
创建流,并将发射器暴露给 class 成员来实现的。 @Outgoing
方法只是 returns 已经构建的流。
以下组件公开了一个 public 方法,produce(String message)
会将文本消息发送到 Kafka:
package ...
import java.util.UUID;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.reactivestreams.Publisher;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.smallrye.reactive.messaging.kafka.KafkaMessage;
@ApplicationScoped
public class KafkaController {
private FlowableEmitter<KafkaMessage<String, String>> emitter;
private Flowable<KafkaMessage<String, String>> outgoingStream;
@PostConstruct
void init() {
outgoingStream = Flowable.create(emitter -> this.emitter = emitter, BackpressureStrategy.BUFFER);
}
public void produce(String message) {
emitter.onNext(KafkaMessage.of(UUID.randomUUID().toString(), message));
}
@PreDestroy
void dispose() {
emitter.onComplete();
}
@Outgoing("internal")
Publisher<KafkaMessage<String, String>> produceKafkaMessage() {
return outgoingStream;
}
@Incoming("internal")
@Outgoing("kafka-test")
KafkaMessage<String, String> transform(Message<KafkaMessage<String, String>> arg) {
return arg.getPayload();
}
}
我在生成的 Quarkus 应用程序中创建了这个 class,如记录 here:
mvn io.quarkus:quarkus-maven-plugin:0.25.0:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=kafka-quickstart \
-Dextensions="kafka"
并配置(application.properties
)如下:
kafka.bootstrap.servers=localhost:9092
mp.messaging.outgoing.kafka-test.connector=smallrye-kafka
mp.messaging.outgoing.kafka-test.topic=test
mp.messaging.outgoing.kafka-test.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.kafka-test.value.serializer=org.apache.kafka.common.serialization.StringSerializer
Kafka 实例完全按照 quickstart 中的描述启动。您可以使用控制台侦听器观看 test
主题,如下所示:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic test --from-beginning --group test-console.consumer
要测试它,您可以创建一个 JAX-RS 资源来调用 produce()
:
package ...
import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
@Path("/control")
public class KafkaProduceControlResource {
@Inject
KafkaController kafkaController;
@POST
@Path("/produce")
public void produceMessage(String message) {
kafkaController.produce(message);
}
}
如下从命令行调用它并观察控制台消费者:
curl -i -s -X POST -d "A text message" \
http://localhost:8080/control/produce
THE HACK: 似乎用 @Outgoing("kafka-test")
注释 produceKafkaMessage()
失败了,因为 Quarkus 不理解 KafkaMessage
is a Message
,并且正在将其包装在一个中,导致序列化错误。我正在使用 "internal"
流绕过它。
我是 kafka 和 quarkus 的新手,我想在处理完用户请求后向 kafka 主题发送消息。
我已经完成了 quarkus-quickstart 中提供的 kafka 示例。我试过 KafkaMessage
// when GET called send message to topic
@GET
@Produces(MediaType.TEXT_PLAIN)
public String hello() {
generateSingle();
return "hello";
}
@Outgoing("single-stations")
public KafkaMessage<Integer, String> generateSingle() {
return KafkaMessage.of(1, "value");
};
但是我得到的结果是不断向kafka主题发送消息。
我想知道有没有其他方法或者我的代码有什么问题。
感谢帮助
此时有关此主题的文档简洁且不完整 (Quarkus 0.25.0)。我设法做到了,但它进行了大量实验,我认为这是一个 hack,希望在以后的 Quarkus 版本中得到补救。
原则是@Outgoing
方法必须产生一个stream被外部控制。这是通过在 @PostConstruct
方法中通过 Flowable.create()
创建流,并将发射器暴露给 class 成员来实现的。 @Outgoing
方法只是 returns 已经构建的流。
以下组件公开了一个 public 方法,produce(String message)
会将文本消息发送到 Kafka:
package ...
import java.util.UUID;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.reactivestreams.Publisher;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.smallrye.reactive.messaging.kafka.KafkaMessage;
@ApplicationScoped
public class KafkaController {
private FlowableEmitter<KafkaMessage<String, String>> emitter;
private Flowable<KafkaMessage<String, String>> outgoingStream;
@PostConstruct
void init() {
outgoingStream = Flowable.create(emitter -> this.emitter = emitter, BackpressureStrategy.BUFFER);
}
public void produce(String message) {
emitter.onNext(KafkaMessage.of(UUID.randomUUID().toString(), message));
}
@PreDestroy
void dispose() {
emitter.onComplete();
}
@Outgoing("internal")
Publisher<KafkaMessage<String, String>> produceKafkaMessage() {
return outgoingStream;
}
@Incoming("internal")
@Outgoing("kafka-test")
KafkaMessage<String, String> transform(Message<KafkaMessage<String, String>> arg) {
return arg.getPayload();
}
}
我在生成的 Quarkus 应用程序中创建了这个 class,如记录 here:
mvn io.quarkus:quarkus-maven-plugin:0.25.0:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=kafka-quickstart \
-Dextensions="kafka"
并配置(application.properties
)如下:
kafka.bootstrap.servers=localhost:9092
mp.messaging.outgoing.kafka-test.connector=smallrye-kafka
mp.messaging.outgoing.kafka-test.topic=test
mp.messaging.outgoing.kafka-test.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.kafka-test.value.serializer=org.apache.kafka.common.serialization.StringSerializer
Kafka 实例完全按照 quickstart 中的描述启动。您可以使用控制台侦听器观看 test
主题,如下所示:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic test --from-beginning --group test-console.consumer
要测试它,您可以创建一个 JAX-RS 资源来调用 produce()
:
package ...
import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
@Path("/control")
public class KafkaProduceControlResource {
@Inject
KafkaController kafkaController;
@POST
@Path("/produce")
public void produceMessage(String message) {
kafkaController.produce(message);
}
}
如下从命令行调用它并观察控制台消费者:
curl -i -s -X POST -d "A text message" \
http://localhost:8080/control/produce
THE HACK: 似乎用 @Outgoing("kafka-test")
注释 produceKafkaMessage()
失败了,因为 Quarkus 不理解 KafkaMessage
is a Message
,并且正在将其包装在一个中,导致序列化错误。我正在使用 "internal"
流绕过它。