Vert.x 如何 pass/get 从 REST 到消息总线的消息?
Vert.x how to pass/get messages from REST to message bus?
我想通过 REST 将消息传递给总线,然后取回它。但我无法正确设置消息总线接收器,它抛出 java.lang.IllegalStateException
: Response has already been written。在现实生活中,消息总线应该从不同的来源接收消息并将消息传递给另一个目标。因此我们只需要将消息发布到总线上。但是如何正确读取消息并处理所有消息呢?例如,从 REST 界面:阅读该消息!
我的简单应用启动:
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new RESTVerticle());
vertx.deployVerticle(new Receiver());
EventBus eventBus = vertx.eventBus();
eventBus.registerDefaultCodec(MessageDTO.class, new CustomMessageCodec());
}
休息部分
public class RESTVerticle extends AbstractVerticle {
private EventBus eventBus = null;
@Override
public void start() throws Exception {
Router router = Router.router(vertx);
eventBus = vertx.eventBus();
router.route().handler(BodyHandler.create());
router.route().handler(CorsHandler.create("*")
.allowedMethod(HttpMethod.GET)
.allowedHeader("Content-Type"));
router.post("/api/message").handler(this::publishToEventBus);
// router.get("/api/messagelist").handler(this::getMessagesFromBus);
router.route("/*").handler(StaticHandler.create());
vertx.createHttpServer().requestHandler(router::accept).listen(9999);
System.out.println("Service running at 0.0.0.0:9999");
}
private void publishToEventBus(RoutingContext routingContext) {
System.out.println("routingContext.getBodyAsString() " + routingContext.getBodyAsString());
final MessageDTO message = Json.decodeValue(routingContext.getBodyAsString(),
MessageDTO.class);
HttpServerResponse response = routingContext.response();
response.setStatusCode(201)
.putHeader("content-type", "application/json; charset=utf-8")
.end(Json.encodePrettily(message));
eventBus.publish("messagesBus", message);
}
接收方:我把它移到另一个地方 class,但没有用
public class Receiver extends AbstractVerticle {
@Override
public void start() throws Exception {
EventBus eventBus = vertx.eventBus();
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create());
router.route().handler(CorsHandler.create("*")
.allowedMethod(HttpMethod.GET)
.allowedHeader("Content-Type"));
router.get("/api/messagelist").handler(this::getMessagesFromBus);
router.route("/*").handler(StaticHandler.create());
vertx.createHttpServer().requestHandler(router::accept).listen(9998);
System.out.println("Service Receiver running at 0.0.0.0:9998");
private void getMessagesFromBus(RoutingContext routingContext) {
EventBus eventBus = vertx.eventBus();
eventBus.consumer("messagesBus", message -> {
MessageDTO customMessage = (MessageDTO) message.body();
HttpServerResponse response = routingContext.response();
System.out.println("Receiver ->>>>>>>> " + customMessage);
if (customMessage != null) {
response.putHeader("content-type", "application/json; charset=utf-8")
.end(Json.encodePrettily(customMessage));
}
response.closed();
});
}
所以如果我 post 向 REST 发送消息并且处理程序将其发布到总线,当我在运行时得到 http://localhost:9998/api/messagelist 它是 return json,但是第二次它抛出异常
java.lang.IllegalStateException: Response has already been written
at io.vertx.core.http.impl.HttpServerResponseImpl.checkWritten(HttpServerResponseImpl.java:561)
at io.vertx.core.http.impl.HttpServerResponseImpl.putHeader(HttpServerResponseImpl.java:154)
at io.vertx.core.http.impl.HttpServerResponseImpl.putHeader(HttpServerResponseImpl.java:52)
at com.project.backend.Receiver.lambda$getMessagesFromBus[=15=](Receiver.java:55)
at io.vertx.core.eventbus.impl.HandlerRegistration.handleMessage(HandlerRegistration.java:207)
at io.vertx.core.eventbus.impl.HandlerRegistration.handle(HandlerRegistration.java:201)
at io.vertx.core.eventbus.impl.EventBusImpl.lambda$deliverToHandler7(EventBusImpl.java:498)
at io.vertx.core.impl.ContextImpl.lambda$wrapTask(ContextImpl.java:335)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:358)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:112)
at java.lang.Thread.run(Thread.java:745)
Receiver ->>>>>>>> Message{username=Aaaewfewf2d, message=41414wefwef2d2}
如何正确获取接收方的所有消息?或者如果总线收到消息,我应该立即将它们存储到数据库中吗?消息总线能否保留消息而不丢失它们?
谢谢
我没有机会测试你的代码,但似乎发布操作抛出异常,vertx 将尝试发回错误消息。但是您已经回复并结束了连接。
现在错误可能来自您的编解码器,但由于 vertx 的异步性质,您只能在稍后阶段看到它并被内部错误处理程序破坏。
每次点击入口点“/api/messagelist”都会创建一个具有请求路由上下文的新消费者。
第一个请求将创建消费者并回复请求。当第二条消息发布时,该消费者将收到该消息并将回复先前的请求(实例)并且该请求已关闭。
我认为您误解了事件总线的用途,我强烈建议您阅读文档。
http://vertx.io/docs/vertx-core/java/#event_bus
我想通过 REST 将消息传递给总线,然后取回它。但我无法正确设置消息总线接收器,它抛出 java.lang.IllegalStateException
: Response has already been written。在现实生活中,消息总线应该从不同的来源接收消息并将消息传递给另一个目标。因此我们只需要将消息发布到总线上。但是如何正确读取消息并处理所有消息呢?例如,从 REST 界面:阅读该消息!
我的简单应用启动:
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new RESTVerticle());
vertx.deployVerticle(new Receiver());
EventBus eventBus = vertx.eventBus();
eventBus.registerDefaultCodec(MessageDTO.class, new CustomMessageCodec());
}
休息部分
public class RESTVerticle extends AbstractVerticle {
private EventBus eventBus = null;
@Override
public void start() throws Exception {
Router router = Router.router(vertx);
eventBus = vertx.eventBus();
router.route().handler(BodyHandler.create());
router.route().handler(CorsHandler.create("*")
.allowedMethod(HttpMethod.GET)
.allowedHeader("Content-Type"));
router.post("/api/message").handler(this::publishToEventBus);
// router.get("/api/messagelist").handler(this::getMessagesFromBus);
router.route("/*").handler(StaticHandler.create());
vertx.createHttpServer().requestHandler(router::accept).listen(9999);
System.out.println("Service running at 0.0.0.0:9999");
}
private void publishToEventBus(RoutingContext routingContext) {
System.out.println("routingContext.getBodyAsString() " + routingContext.getBodyAsString());
final MessageDTO message = Json.decodeValue(routingContext.getBodyAsString(),
MessageDTO.class);
HttpServerResponse response = routingContext.response();
response.setStatusCode(201)
.putHeader("content-type", "application/json; charset=utf-8")
.end(Json.encodePrettily(message));
eventBus.publish("messagesBus", message);
}
接收方:我把它移到另一个地方 class,但没有用
public class Receiver extends AbstractVerticle {
@Override
public void start() throws Exception {
EventBus eventBus = vertx.eventBus();
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create());
router.route().handler(CorsHandler.create("*")
.allowedMethod(HttpMethod.GET)
.allowedHeader("Content-Type"));
router.get("/api/messagelist").handler(this::getMessagesFromBus);
router.route("/*").handler(StaticHandler.create());
vertx.createHttpServer().requestHandler(router::accept).listen(9998);
System.out.println("Service Receiver running at 0.0.0.0:9998");
private void getMessagesFromBus(RoutingContext routingContext) {
EventBus eventBus = vertx.eventBus();
eventBus.consumer("messagesBus", message -> {
MessageDTO customMessage = (MessageDTO) message.body();
HttpServerResponse response = routingContext.response();
System.out.println("Receiver ->>>>>>>> " + customMessage);
if (customMessage != null) {
response.putHeader("content-type", "application/json; charset=utf-8")
.end(Json.encodePrettily(customMessage));
}
response.closed();
});
}
所以如果我 post 向 REST 发送消息并且处理程序将其发布到总线,当我在运行时得到 http://localhost:9998/api/messagelist 它是 return json,但是第二次它抛出异常
java.lang.IllegalStateException: Response has already been written
at io.vertx.core.http.impl.HttpServerResponseImpl.checkWritten(HttpServerResponseImpl.java:561)
at io.vertx.core.http.impl.HttpServerResponseImpl.putHeader(HttpServerResponseImpl.java:154)
at io.vertx.core.http.impl.HttpServerResponseImpl.putHeader(HttpServerResponseImpl.java:52)
at com.project.backend.Receiver.lambda$getMessagesFromBus[=15=](Receiver.java:55)
at io.vertx.core.eventbus.impl.HandlerRegistration.handleMessage(HandlerRegistration.java:207)
at io.vertx.core.eventbus.impl.HandlerRegistration.handle(HandlerRegistration.java:201)
at io.vertx.core.eventbus.impl.EventBusImpl.lambda$deliverToHandler7(EventBusImpl.java:498)
at io.vertx.core.impl.ContextImpl.lambda$wrapTask(ContextImpl.java:335)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:358)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:112)
at java.lang.Thread.run(Thread.java:745)
Receiver ->>>>>>>> Message{username=Aaaewfewf2d, message=41414wefwef2d2}
如何正确获取接收方的所有消息?或者如果总线收到消息,我应该立即将它们存储到数据库中吗?消息总线能否保留消息而不丢失它们?
谢谢
我没有机会测试你的代码,但似乎发布操作抛出异常,vertx 将尝试发回错误消息。但是您已经回复并结束了连接。
现在错误可能来自您的编解码器,但由于 vertx 的异步性质,您只能在稍后阶段看到它并被内部错误处理程序破坏。
每次点击入口点“/api/messagelist”都会创建一个具有请求路由上下文的新消费者。
第一个请求将创建消费者并回复请求。当第二条消息发布时,该消费者将收到该消息并将回复先前的请求(实例)并且该请求已关闭。
我认为您误解了事件总线的用途,我强烈建议您阅读文档。 http://vertx.io/docs/vertx-core/java/#event_bus