Uni 等待 Vertx eventBus 消息
Uni wait for Vertx eventBus message
我有两个端点:
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("/waitForEvent")
public Uni<Object> waitForEvent() {
return Uni.createFrom().emitter(em -> {
//wait for event from eventBus
// eventBus.consumer("test", msg -> {
// System.out.printf("receive event: %s\n", msg.body());
// em.complete(msg);
// });
}).ifNoItem().after(Duration.ofSeconds(5)).failWith(new RuntimeException("timeout"));
}
@GET
@Path("/send")
public void test() {
System.out.println("send event");
eventBus.send("test", "send test event");
}
只有当它从 eventBus 接收到事件时,waitForEvent() 才应该完成。如何使用 vertx 和 mutiny 实现此目的?
一般来说,我们会避免这种模式并使用事件总线中的request/reply机制:
@GET
@Path("/send")
public Uni<String> test() {
return bus.<String>request("test", name)
.onItem().transform(Message::body)
.ifNoItem().after(Duration.ofSeconds(5)).failWith(new RuntimeException("timeout"));
}
当使用两个端点(如问题中)实施时,它可能会变得有点复杂,就好像您多次调用 /waitForEvent
端点一样,您需要确保每个“消费者”都得到消息。
这仍然是可能的,但需要这样的东西:
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("/waitForEvent")
public Uni<String> waitForEvent() {
return Uni.createFrom().emitter(emitter -> {
MessageConsumer<String> consumer = bus.consumer("test");
consumer.handler(m -> {
emitter.complete(m.body());
consumer.unregisterAndForget();
})
.ifNoItem().after(Duration.ofSeconds(5)).failWith(new RuntimeException("timeout"));
}
@GET
@Path("/send")
public void test() {
bus.publish("test", "send test event");
}
请务必使用事件总线的 io.vertx.mutiny.core.eventbus.EventBus
变体。
我有两个端点:
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("/waitForEvent")
public Uni<Object> waitForEvent() {
return Uni.createFrom().emitter(em -> {
//wait for event from eventBus
// eventBus.consumer("test", msg -> {
// System.out.printf("receive event: %s\n", msg.body());
// em.complete(msg);
// });
}).ifNoItem().after(Duration.ofSeconds(5)).failWith(new RuntimeException("timeout"));
}
@GET
@Path("/send")
public void test() {
System.out.println("send event");
eventBus.send("test", "send test event");
}
只有当它从 eventBus 接收到事件时,waitForEvent() 才应该完成。如何使用 vertx 和 mutiny 实现此目的?
一般来说,我们会避免这种模式并使用事件总线中的request/reply机制:
@GET
@Path("/send")
public Uni<String> test() {
return bus.<String>request("test", name)
.onItem().transform(Message::body)
.ifNoItem().after(Duration.ofSeconds(5)).failWith(new RuntimeException("timeout"));
}
当使用两个端点(如问题中)实施时,它可能会变得有点复杂,就好像您多次调用 /waitForEvent
端点一样,您需要确保每个“消费者”都得到消息。
这仍然是可能的,但需要这样的东西:
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("/waitForEvent")
public Uni<String> waitForEvent() {
return Uni.createFrom().emitter(emitter -> {
MessageConsumer<String> consumer = bus.consumer("test");
consumer.handler(m -> {
emitter.complete(m.body());
consumer.unregisterAndForget();
})
.ifNoItem().after(Duration.ofSeconds(5)).failWith(new RuntimeException("timeout"));
}
@GET
@Path("/send")
public void test() {
bus.publish("test", "send test event");
}
请务必使用事件总线的 io.vertx.mutiny.core.eventbus.EventBus
变体。