如何正确 return 从 RxJava Single 到 Vertx Event Bus Consumer 的值
How to correctly return a value from a RxJava Single to a Vertx Event Bus Consumer
Verticle 中的以下代码将消息放在事件总线上
io.vertx.reactivex.core.Vertx rxVertx = io.vertx.reactivex.core.Vertx.newInstance(vertx);
Single<Message<Integer>> reply = rxVertx.eventBus().<Integer>rxSend("address", "param");
reply.subscribe(r -> {
// Do something with value
});
另一个 Verticle 消费消息:
vertx.eventBus().<Integer>consumer("address", h -> {
Integer integer = ... // call to getValue() method
h.reply(integer);
});
要returned 的值来自使用SQLClient 的mySql 数据库。目前检索值的代码在以下方法中:
private Single<Integer> getValue() {
return Single.create(source -> {
mySQLClient.getConnection(res -> {
if (res.succeeded()) {
SQLConnection connection = res.result();
connection.query("SELECT count(*) from myTable", result -> {
if (result.succeeded()) {
Integer i = result.result().getRows().get(0));
source.onSuccess(i);
}
});
} else {
source.onError(res.cause());
}
});
});
}
从消费者内部调用 getValue()
方法的正确方法是什么?
以下:
vertx.eventBus().<Integer>consumer("address", h -> {
Single<integer> single = getValue();
h.reply(single.subscribe(
s -> System.out.println(s));
});
打印出 h.reply
return 之前的值,但是如何 return 来自 single.subscribe()
的值,使其成为 [=18= 的参数]?
谢谢
您需要订阅结果并将结果传递给回复。你颠倒了语法。
getvalue().subscribe(result -> {
h.reply(result);
}, ex -> {
//Handle error
});
Verticle 中的以下代码将消息放在事件总线上
io.vertx.reactivex.core.Vertx rxVertx = io.vertx.reactivex.core.Vertx.newInstance(vertx);
Single<Message<Integer>> reply = rxVertx.eventBus().<Integer>rxSend("address", "param");
reply.subscribe(r -> {
// Do something with value
});
另一个 Verticle 消费消息:
vertx.eventBus().<Integer>consumer("address", h -> {
Integer integer = ... // call to getValue() method
h.reply(integer);
});
要returned 的值来自使用SQLClient 的mySql 数据库。目前检索值的代码在以下方法中:
private Single<Integer> getValue() {
return Single.create(source -> {
mySQLClient.getConnection(res -> {
if (res.succeeded()) {
SQLConnection connection = res.result();
connection.query("SELECT count(*) from myTable", result -> {
if (result.succeeded()) {
Integer i = result.result().getRows().get(0));
source.onSuccess(i);
}
});
} else {
source.onError(res.cause());
}
});
});
}
从消费者内部调用 getValue()
方法的正确方法是什么?
以下:
vertx.eventBus().<Integer>consumer("address", h -> {
Single<integer> single = getValue();
h.reply(single.subscribe(
s -> System.out.println(s));
});
打印出 h.reply
return 之前的值,但是如何 return 来自 single.subscribe()
的值,使其成为 [=18= 的参数]?
谢谢
您需要订阅结果并将结果传递给回复。你颠倒了语法。
getvalue().subscribe(result -> {
h.reply(result);
}, ex -> {
//Handle error
});