vert.x:从事件总线发布和消费消息
vert.x: publish and consume messages from event bus
我写了下面的代码:
public class VertxApp {
public static void main(String[] args) { // This is OK
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new ReceiveVerticle()); // line A
vertx.deployVerticle(new SendVerticle()); // line B
}
}
public class ReceiveVerticle extends AbstractVerticle{
@Override
public void start(Future<Void> startFuture) {
vertx.eventBus().consumer("address", message -> {
System.out.println("message received by receiver");
System.out.println(message.body());
});
}
}
public class SendVerticle extends AbstractVerticle {
@Override
public void start(Future<Void> startFuture) throws InterruptedException {
System.out.println("SendVerticle started!");
int i = 0;
for (i = 0; i < 5; i++) {
System.out.println("Sender sends a message " + i );
vertx.eventBus().publish("address", "message" + i);
}
}
}
此代码不一致。存在竞争条件。如果我 运行 代码多次,有时发送的 5 条消息都被消耗掉,有时 none 被消耗掉。
能否请您解释一下为什么这里存在竞争条件以及如何解决?
没有竞争条件,部署 Verticle 是一个异步操作,您的接收者 Verticle 可以在 发送者 Verticle 发送消息后注册消费者。
为确保操作按顺序进行,请使用带有处理程序参数的 deploy
方法:
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new ReceiveVerticle(), ar -> {
if (ar.succeeded()) {
vertx.deployVerticle(new SendVerticle());
} else {
// handle the problem -> ar.cause()
}
});
我写了下面的代码:
public class VertxApp {
public static void main(String[] args) { // This is OK
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new ReceiveVerticle()); // line A
vertx.deployVerticle(new SendVerticle()); // line B
}
}
public class ReceiveVerticle extends AbstractVerticle{
@Override
public void start(Future<Void> startFuture) {
vertx.eventBus().consumer("address", message -> {
System.out.println("message received by receiver");
System.out.println(message.body());
});
}
}
public class SendVerticle extends AbstractVerticle {
@Override
public void start(Future<Void> startFuture) throws InterruptedException {
System.out.println("SendVerticle started!");
int i = 0;
for (i = 0; i < 5; i++) {
System.out.println("Sender sends a message " + i );
vertx.eventBus().publish("address", "message" + i);
}
}
}
此代码不一致。存在竞争条件。如果我 运行 代码多次,有时发送的 5 条消息都被消耗掉,有时 none 被消耗掉。
能否请您解释一下为什么这里存在竞争条件以及如何解决?
没有竞争条件,部署 Verticle 是一个异步操作,您的接收者 Verticle 可以在 发送者 Verticle 发送消息后注册消费者。
为确保操作按顺序进行,请使用带有处理程序参数的 deploy
方法:
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new ReceiveVerticle(), ar -> {
if (ar.succeeded()) {
vertx.deployVerticle(new SendVerticle());
} else {
// handle the problem -> ar.cause()
}
});