event_bus 消费者支持 运行 异步。方法?

Does event_bus consumer support to run Async. method?

我是 Vert.x 异步编程的新手。我有 event_bus 消费者,一旦收到消息,就需要调用异步。方法 async_method_to_access_database 从数据库中获取数据库记录。 async_method_to_access_database 将 return 一个未来。代码如下所示。

这里我有一个问题:

里面event_bus消费者,可以运行一个Async。方法?我怀疑消费者处理程序是否支持 运行 Async。方法?在Vertx中,我了解到Vertilce包含事件循环,它支持Async。方法,我不确定 event_bus 消费者处理程序是否支持它?

EventBus eb = vertx.eventBus();

MessageConsumer<String> consumer = eb.consumer("my_address");
consumer.handler(message -> {
  
    Future<List<DataRecord>> listFuture = async_method_to_access_database();

    listFuture.onSuccess(ar->{

        doSomething();

    });

});

您可以在处理程序中使用异步代码。了解您有两种在事件总线上发送消息的方式的重要部分:request/publish.

  • 发布你只是fire-and-forget
  • 并且对于 request,您将注册一个处理程序,等待消费者通过调用 message.reply().
  • 来回答消息

在这两种情况下,您在消费者中的异步代码都将被执行,但是当您使用发送时,您可以选择在发送方有额外的逻辑,(例如:重复错误、验证响应等)

EventBus eb = vertx.eventBus();

// sender

eb.request("my_address","testmessage", h -> {
  if(h.succeeded()){
    System.out.println("successful access to db");
  }else{
    System.out.println(h.cause()); 
  }
});

// consumer

MessageConsumer<String> consumer = eb.consumer("my_address");
consumer.handler(message -> {
  
    Future<List<DataRecord>> listFuture = async_method_to_access_database();

    listFuture.onComplete(ar->{
        if (ar.succeeded()) {
            message.reply("updated elements successfully" + listFuture.size());
        }

        message.fail(1,"critical error") // will trigger error in sender
    });

});