反应式消息传递:在需要时发出事件(使用 Kafka)
Reactive Messaging: Emit events when needed (using Kafka)
我正在学习 Quarkus 和 Reactive Messaging。我正在尝试在两个组件之间传递消息。我发现的示例演示了具有已知数据集的流,这些数据集被流式传输或连续重复有效负载。 (例如来自 Kafka Quickstart,它不断地流式传输一个新的随机数作为价格)
仅当业务逻辑中发生某些事件时,我才需要将事件放入流中。有例子吗?
我确实在 Whosebug 上找到了这个 post,。但是,有两个问题:
我无法使用此表格。
- 发射器始终为空。
- 我正在尝试纯粹使用 Reactive Messaging 来做到这一点,而不会从后台流过 Kafka
更新:
@iabughosh
谢谢。但我仍然得到一个空注入的发射器。以下是相关的代码片段:
mp.messaging.outgoing.ownercreated.connector=smallrye-kafka
mp.messaging.outgoing.ownercreated.topic=ownercreated
mp.messaging.outgoing.ownercreated.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer
`public class Owner {
@Inject
@Channel("ownercreated")
private static Emitter<Owner> ownerCreatedChannel;
public void persist() {
Owner.ownerCreatedChannel.send(this);
}
}`
我也注入了实例变量。
应@iabughosh 的要求更新#2 - 感谢您的帮助!
package org.boosey;
import io.smallrye.reactive.messaging.annotations.Channel;
import io.smallrye.reactive.messaging.annotations.Emitter;
import java.util.logging.Logger;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
@ApplicationScoped
public class Owner {
private final Logger logger = Logger.getLogger(Owner.class.getName());
@Inject
@Channel("ownercreated")
private Emitter<Owner> ownerCreatedChannel;
public String name;
public String email;
public void persist() {
logger.info("IN PERSIST");
ownerCreatedChannel.send(this);
logger.info("SENT NEW OWNER");
}
}
application.properties:
mp.messaging.outgoing.ownercreated.connector=smallrye-kafka
mp.messaging.outgoing.ownercreated.topic=ownercreated
mp.messaging.outgoing.ownercreated.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer
正在从 Quarkus REST 资源 class 调用 Owner.persist 方法。我已验证 Owner.persist.
中收到了正确实例化的 Owner 对象
@Path("/owner")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@ApplicationScoped
public class OwnerResource {
@POST
public Response create(Owner owner) {
owner.persist();
return Response.status(201).build();
}
}
如果您在 application.properties 文件中正确配置了 outgoing
主题,您需要做的就是像这样注入 Emitter :
@Inject
@Channel("your-channel")
Emitter<String> outgoingChannel;
并且在您的函数中您可以调用:
outgoingChannel.send(msg);
您的频道在配置文件中的位置如下所示:
mp.messaging.outgoing.your-channel.topic=kafka-topic
更新:
将发射器(连同注释)代码移动到 OwnerResource,它应该可以顺利运行。如果您移动了该代码,您也可以从 Owner 中删除 @ApplicationScoped。这里发生的是 Owner 对象不是由 CDI 创建的,这就是它没有注入任何其他对象的原因。
问候。
我正在学习 Quarkus 和 Reactive Messaging。我正在尝试在两个组件之间传递消息。我发现的示例演示了具有已知数据集的流,这些数据集被流式传输或连续重复有效负载。 (例如来自 Kafka Quickstart,它不断地流式传输一个新的随机数作为价格)
仅当业务逻辑中发生某些事件时,我才需要将事件放入流中。有例子吗?
我确实在 Whosebug 上找到了这个 post,
我无法使用此表格。
- 发射器始终为空。
- 我正在尝试纯粹使用 Reactive Messaging 来做到这一点,而不会从后台流过 Kafka
更新: @iabughosh
谢谢。但我仍然得到一个空注入的发射器。以下是相关的代码片段:
mp.messaging.outgoing.ownercreated.connector=smallrye-kafka
mp.messaging.outgoing.ownercreated.topic=ownercreated
mp.messaging.outgoing.ownercreated.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer
`public class Owner {
@Inject
@Channel("ownercreated")
private static Emitter<Owner> ownerCreatedChannel;
public void persist() {
Owner.ownerCreatedChannel.send(this);
}
}`
我也注入了实例变量。
应@iabughosh 的要求更新#2 - 感谢您的帮助!
package org.boosey;
import io.smallrye.reactive.messaging.annotations.Channel;
import io.smallrye.reactive.messaging.annotations.Emitter;
import java.util.logging.Logger;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
@ApplicationScoped
public class Owner {
private final Logger logger = Logger.getLogger(Owner.class.getName());
@Inject
@Channel("ownercreated")
private Emitter<Owner> ownerCreatedChannel;
public String name;
public String email;
public void persist() {
logger.info("IN PERSIST");
ownerCreatedChannel.send(this);
logger.info("SENT NEW OWNER");
}
}
application.properties:
mp.messaging.outgoing.ownercreated.connector=smallrye-kafka
mp.messaging.outgoing.ownercreated.topic=ownercreated
mp.messaging.outgoing.ownercreated.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer
正在从 Quarkus REST 资源 class 调用 Owner.persist 方法。我已验证 Owner.persist.
中收到了正确实例化的 Owner 对象@Path("/owner")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@ApplicationScoped
public class OwnerResource {
@POST
public Response create(Owner owner) {
owner.persist();
return Response.status(201).build();
}
}
如果您在 application.properties 文件中正确配置了 outgoing
主题,您需要做的就是像这样注入 Emitter :
@Inject
@Channel("your-channel")
Emitter<String> outgoingChannel;
并且在您的函数中您可以调用:
outgoingChannel.send(msg);
您的频道在配置文件中的位置如下所示:
mp.messaging.outgoing.your-channel.topic=kafka-topic
更新: 将发射器(连同注释)代码移动到 OwnerResource,它应该可以顺利运行。如果您移动了该代码,您也可以从 Owner 中删除 @ApplicationScoped。这里发生的是 Owner 对象不是由 CDI 创建的,这就是它没有注入任何其他对象的原因。 问候。