Vertx/RxJava/WebClient/ApiGateway/Reactive
Vertx/RxJava/WebClient/ApiGateway/Reactive
我正在使用 vert.x 和 RxJava 开发 Apigateway。我想发送 2 Api
秒的反应性请求,从他们两个那里得到响应,然后 HttpServer
发送合并的 JSON。但是 onComplete()
提前执行并且 returns 空 JSON。我认为问题出在 vert.x 的 asynchronous
字符上,但我不知道到底出了什么问题。
这是我的方法:
private void dispatchBoth(RoutingContext routingContext) {
Observer<String> observer = new Observer<String>() {
JsonArray jsonArray = new JsonArray();
@Override
public void onSubscribe(Disposable disposable) {
System.out.println("Start");
}
@Override
public void onNext(String s) {
Thread t = new Thread(() -> {
if(s=="/api/userApi/selectAllUsers") {
WebClient client = WebClient.create(vertx);
client
.get(8081, "localhost", s)
.send(ar->{
if (ar.succeeded()) {
HttpResponse<Buffer> response = ar.result();
jsonArray.addAll(response.bodyAsJsonArray());
System.out.println(jsonArray.encodePrettily());
} else {
System.out.println("Something went wrong " + ar.cause().getMessage());
}
});
}else if(s=="/api/holidayApi/selectAllHolidays") {
WebClient client = WebClient.create(vertx);
client
.get(8080, "localhost", s)
.send(ar -> {
if (ar.succeeded()) {
HttpResponse<Buffer> response = ar.result();
jsonArray.addAll(response.bodyAsJsonArray());
// System.out.println(jsonArray.encodePrettily());
} else {
System.out.println("Something went wrong " + ar.cause().getMessage());
}
});
}
});
t.start();
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
System.out.println(jsonArray.encodePrettily());
routingContext.response().end(jsonArray.encodePrettily());
}
};
Observable.fromArray(com).subscribe(observer);
}
这就是我在控制台上得到的输出:
[ ]
[ {
"holidayId" : 2,
"userId" : 3,
"place" : "Poland",
"date" : {
"year" : 2016,
"month" : "DECEMBER",
"dayOfMonth" : 29,
"dayOfWeek" : "THURSDAY",
"era" : "CE",
"dayOfYear" : 364,
"leapYear" : true,
"monthValue" : 12,
"chronology" : {
"id" : "ISO",
"calendarType" : "iso8601"
}
}
}, {
"holidayId" : 10,
"userId" : 1,
"place" : "Netherland",
"date" : {
"year" : 2020,
"month" : "JANUARY",
"dayOfMonth" : 21,
"dayOfWeek" : "TUESDAY",
"era" : "CE",
"dayOfYear" : 21,
"leapYear" : true,
"monthValue" : 1,
"chronology" : {
"id" : "ISO",
"calendarType" : "iso8601"
}
}
}, {
"userId" : 1,
"name" : "Kacper",
"phone_number" : "667667202"
}, {
"userId" : 3,
"name" : "Kamil",
"phone_number" : "6734583443"
}, {
"userId" : 4,
"name" : "Janek",
"phone_number" : "231253575"
}, {
"userId" : 5,
"name" : "Grzegorz",
"phone_number" : "123456789"
}, {
"userId" : 6,
"name" : "Justin",
"phone_number" : "111000111"
}, {
"userId" : 8,
"name" : "Mike",
"phone_number" : "997"
}, {
"userId" : 9,
"name" : "Gorge",
"phone_number" : "991"
} ]
我看到你在 onNext()
中开始了一个新话题?当您使用 Thread.start()
启动新线程时,它将并行执行其工作,而当前的 Observable.onNext()
将认为工作已完成。这将导致 onComplete()
被调用。
没有人在等待线程的工作完成。希望你明白我的意思。
现在,我对vert.x并不熟悉,但按照RxJava的方式,似乎没有必要使用Thread
。您可以将其删除并将其余逻辑直接保留在 onNext()
方法中。
onComplete
准时执行:当字符串输入流结束时。您需要另一种方法来等待所有 I/O 操作完成的时刻。这是一个棘手的部分,我不知道 Vertx 或 RxJava 是否可以做到这一点,但标准 Java API 可以,使用 CompletableFuture
。所以我们创建从 CompletableFuture 到 Handler 的适配器,它保存一个 I/O 操作的结果:
class HandelerFuture extends CompletableFuture<JsonArray>
implements Handler<AsyncResult<HttpResponse<Buffer>>> {
@Override
public void handle(AsyncResult<HttpResponse<Buffer>> ar) {
if (ar.succeeded()) {
JsonArray array = ar.result().bodyAsJsonArray();
super.complete(array);
} else {
super.completeExceptionally(ar.cause());
}
}
}
此外,您不需要将 onNext
方法的主体包裹在 Tread
中。其次,您无需检查传递的 url 字符串是什么,因为它没有任何区别。第三,您仅使用 Observer 来处理 url 字符串列表——这太复杂了。普通循环就足够了。
CompletableFuture<HandelerFuture[]> dispatchBoth(String... urls) {
ArrayList<HandelerFuture> futures = new ArrayList<>(); // all results
for (String url : urls) {
HandelerFuture future = new HandelerFuture();
futures.add(future);
WebClient client = WebClient.create(vertx);
client.get(8081, "localhost", url)
.send(future);
}
CompletableFuture all = new CompletableFuture();
HandelerFuture[] array = futures.toArray(new HandelerFuture[0]);
CompletableFuture.allOf(array)
.thenRunAsync(() -> all.complete(array));
return all;
}
那么可以运行如下:
CompletableFuture<HandelerFuture[]> future = dispatchBoth(com);
HandelerFuture[] results = future.get();
JsonArray finalArray;
for (HandelerFuture result:results) {
try {
// extract partial json array
JsonArray partialArray = result.get();
// combine partialArray with finalArray somehow
} catch (Exception e) {
// this is the exception got in handle() method as ar.cause().
e.printStackTrace();
}
}
routingContext.response().end(finalArray.encodePrettily());
你没有说明你将如何组合 json 个数组,所以我没有实现它。
我正在使用 vert.x 和 RxJava 开发 Apigateway。我想发送 2 Api
秒的反应性请求,从他们两个那里得到响应,然后 HttpServer
发送合并的 JSON。但是 onComplete()
提前执行并且 returns 空 JSON。我认为问题出在 vert.x 的 asynchronous
字符上,但我不知道到底出了什么问题。
这是我的方法:
private void dispatchBoth(RoutingContext routingContext) {
Observer<String> observer = new Observer<String>() {
JsonArray jsonArray = new JsonArray();
@Override
public void onSubscribe(Disposable disposable) {
System.out.println("Start");
}
@Override
public void onNext(String s) {
Thread t = new Thread(() -> {
if(s=="/api/userApi/selectAllUsers") {
WebClient client = WebClient.create(vertx);
client
.get(8081, "localhost", s)
.send(ar->{
if (ar.succeeded()) {
HttpResponse<Buffer> response = ar.result();
jsonArray.addAll(response.bodyAsJsonArray());
System.out.println(jsonArray.encodePrettily());
} else {
System.out.println("Something went wrong " + ar.cause().getMessage());
}
});
}else if(s=="/api/holidayApi/selectAllHolidays") {
WebClient client = WebClient.create(vertx);
client
.get(8080, "localhost", s)
.send(ar -> {
if (ar.succeeded()) {
HttpResponse<Buffer> response = ar.result();
jsonArray.addAll(response.bodyAsJsonArray());
// System.out.println(jsonArray.encodePrettily());
} else {
System.out.println("Something went wrong " + ar.cause().getMessage());
}
});
}
});
t.start();
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
System.out.println(jsonArray.encodePrettily());
routingContext.response().end(jsonArray.encodePrettily());
}
};
Observable.fromArray(com).subscribe(observer);
}
这就是我在控制台上得到的输出:
[ ]
[ {
"holidayId" : 2,
"userId" : 3,
"place" : "Poland",
"date" : {
"year" : 2016,
"month" : "DECEMBER",
"dayOfMonth" : 29,
"dayOfWeek" : "THURSDAY",
"era" : "CE",
"dayOfYear" : 364,
"leapYear" : true,
"monthValue" : 12,
"chronology" : {
"id" : "ISO",
"calendarType" : "iso8601"
}
}
}, {
"holidayId" : 10,
"userId" : 1,
"place" : "Netherland",
"date" : {
"year" : 2020,
"month" : "JANUARY",
"dayOfMonth" : 21,
"dayOfWeek" : "TUESDAY",
"era" : "CE",
"dayOfYear" : 21,
"leapYear" : true,
"monthValue" : 1,
"chronology" : {
"id" : "ISO",
"calendarType" : "iso8601"
}
}
}, {
"userId" : 1,
"name" : "Kacper",
"phone_number" : "667667202"
}, {
"userId" : 3,
"name" : "Kamil",
"phone_number" : "6734583443"
}, {
"userId" : 4,
"name" : "Janek",
"phone_number" : "231253575"
}, {
"userId" : 5,
"name" : "Grzegorz",
"phone_number" : "123456789"
}, {
"userId" : 6,
"name" : "Justin",
"phone_number" : "111000111"
}, {
"userId" : 8,
"name" : "Mike",
"phone_number" : "997"
}, {
"userId" : 9,
"name" : "Gorge",
"phone_number" : "991"
} ]
我看到你在 onNext()
中开始了一个新话题?当您使用 Thread.start()
启动新线程时,它将并行执行其工作,而当前的 Observable.onNext()
将认为工作已完成。这将导致 onComplete()
被调用。
没有人在等待线程的工作完成。希望你明白我的意思。
现在,我对vert.x并不熟悉,但按照RxJava的方式,似乎没有必要使用Thread
。您可以将其删除并将其余逻辑直接保留在 onNext()
方法中。
onComplete
准时执行:当字符串输入流结束时。您需要另一种方法来等待所有 I/O 操作完成的时刻。这是一个棘手的部分,我不知道 Vertx 或 RxJava 是否可以做到这一点,但标准 Java API 可以,使用 CompletableFuture
。所以我们创建从 CompletableFuture 到 Handler 的适配器,它保存一个 I/O 操作的结果:
class HandelerFuture extends CompletableFuture<JsonArray>
implements Handler<AsyncResult<HttpResponse<Buffer>>> {
@Override
public void handle(AsyncResult<HttpResponse<Buffer>> ar) {
if (ar.succeeded()) {
JsonArray array = ar.result().bodyAsJsonArray();
super.complete(array);
} else {
super.completeExceptionally(ar.cause());
}
}
}
此外,您不需要将 onNext
方法的主体包裹在 Tread
中。其次,您无需检查传递的 url 字符串是什么,因为它没有任何区别。第三,您仅使用 Observer 来处理 url 字符串列表——这太复杂了。普通循环就足够了。
CompletableFuture<HandelerFuture[]> dispatchBoth(String... urls) {
ArrayList<HandelerFuture> futures = new ArrayList<>(); // all results
for (String url : urls) {
HandelerFuture future = new HandelerFuture();
futures.add(future);
WebClient client = WebClient.create(vertx);
client.get(8081, "localhost", url)
.send(future);
}
CompletableFuture all = new CompletableFuture();
HandelerFuture[] array = futures.toArray(new HandelerFuture[0]);
CompletableFuture.allOf(array)
.thenRunAsync(() -> all.complete(array));
return all;
}
那么可以运行如下:
CompletableFuture<HandelerFuture[]> future = dispatchBoth(com);
HandelerFuture[] results = future.get();
JsonArray finalArray;
for (HandelerFuture result:results) {
try {
// extract partial json array
JsonArray partialArray = result.get();
// combine partialArray with finalArray somehow
} catch (Exception e) {
// this is the exception got in handle() method as ar.cause().
e.printStackTrace();
}
}
routingContext.response().end(finalArray.encodePrettily());
你没有说明你将如何组合 json 个数组,所以我没有实现它。