使用异步函数迭代
Iterate with asynchronous functions
使用 Twitch API 和 vert.x
- 我希望使用 WebClient
和 Twitch 的 cursor
响应不断向 Twitch 的 API 发送请求一页一页地看。但是,由于 vert.x 的异步性质,我不确定如何返回并继续查询直到满足条件。
到目前为止,这是我的代码
public void getEntireStreamList(Handler<AsyncResult<JsonObject>> handler) {
JsonObject data = new JsonObject();
getLiveChannels(100, result -> {
if(result.succeeded()) {
JsonObject json = result.result();
String cursor = json.getJsonObject("pagination").getString("cursor");
data.put("data", json.getJsonArray("data"));
if(json.getJsonArray("data").size() < 100) { // IF NOT LAST PAGE
// GO BACK AND DO AGAIN WITH CURSOR IN REQUEST
}
handler.handle(Future.succeededFuture(data));
} else
handler.handle(Future.failedFuture(result.cause()));
});
}
理想情况下,我可以使用上一个请求中的 cursor
字符串调用 getLiveChannels
以继续搜索。
您将需要使用 Future 组合。
这是我针对您的问题的代码:
public void getEntireStreamList(Handler<AsyncResult<JsonObject>> handler) {
JsonArray data = new JsonArray();
// create initial Future for first function call
Future<JsonObject> initFuture = Future.future();
// complete Future when getLiveChannels completes
// fail on exception
getLiveChannels(100, initFuture.completer());
// Create a callback that returns a Future
// for composition.
final AtomicReference<Function<JsonObject, Future<JsonObject>>> callback = new AtomicReference<>();
// Create Function that calls composition with itself.
// This is similar to recursion.
Function<JsonObject, Future<JsonObject>> cb = new Function<JsonObject, Future<JsonObject>>() {
@Override
public Future<JsonObject> apply(JsonObject json) {
// new Future to return
Future<JsonObject> f = Future.future();
// Do what you wanna do with the data
String cursor = json.getJsonObject("pagination").getString("cursor");
data.addAll(json.getJsonArray("data"));
// IF NOT LAST PAGE
if(json.getJsonArray("data").size() == 100) {
// get more live channels with cursor
getLiveChannels(100, cursor, f.completer());
// return composed Future
return f.compose(this);
}
// Otherwise return completed Future with results.
f.complete(new JsonObject().put("data", data));
return f;
}
};
Future<JsonObject> composite = initFuture.compose(cb);
// Set handler on composite Future (ALL composed futures together)
composite.setHandler(result -> handler.handle(result));
}
如果您阅读 sequential Future composition 上的 Vert.x 文档,代码 + 注释应该不言而喻。
使用 Twitch API 和 vert.x
- 我希望使用 WebClient
和 Twitch 的 cursor
响应不断向 Twitch 的 API 发送请求一页一页地看。但是,由于 vert.x 的异步性质,我不确定如何返回并继续查询直到满足条件。
到目前为止,这是我的代码
public void getEntireStreamList(Handler<AsyncResult<JsonObject>> handler) {
JsonObject data = new JsonObject();
getLiveChannels(100, result -> {
if(result.succeeded()) {
JsonObject json = result.result();
String cursor = json.getJsonObject("pagination").getString("cursor");
data.put("data", json.getJsonArray("data"));
if(json.getJsonArray("data").size() < 100) { // IF NOT LAST PAGE
// GO BACK AND DO AGAIN WITH CURSOR IN REQUEST
}
handler.handle(Future.succeededFuture(data));
} else
handler.handle(Future.failedFuture(result.cause()));
});
}
理想情况下,我可以使用上一个请求中的 cursor
字符串调用 getLiveChannels
以继续搜索。
您将需要使用 Future 组合。
这是我针对您的问题的代码:
public void getEntireStreamList(Handler<AsyncResult<JsonObject>> handler) {
JsonArray data = new JsonArray();
// create initial Future for first function call
Future<JsonObject> initFuture = Future.future();
// complete Future when getLiveChannels completes
// fail on exception
getLiveChannels(100, initFuture.completer());
// Create a callback that returns a Future
// for composition.
final AtomicReference<Function<JsonObject, Future<JsonObject>>> callback = new AtomicReference<>();
// Create Function that calls composition with itself.
// This is similar to recursion.
Function<JsonObject, Future<JsonObject>> cb = new Function<JsonObject, Future<JsonObject>>() {
@Override
public Future<JsonObject> apply(JsonObject json) {
// new Future to return
Future<JsonObject> f = Future.future();
// Do what you wanna do with the data
String cursor = json.getJsonObject("pagination").getString("cursor");
data.addAll(json.getJsonArray("data"));
// IF NOT LAST PAGE
if(json.getJsonArray("data").size() == 100) {
// get more live channels with cursor
getLiveChannels(100, cursor, f.completer());
// return composed Future
return f.compose(this);
}
// Otherwise return completed Future with results.
f.complete(new JsonObject().put("data", data));
return f;
}
};
Future<JsonObject> composite = initFuture.compose(cb);
// Set handler on composite Future (ALL composed futures together)
composite.setHandler(result -> handler.handle(result));
}
如果您阅读 sequential Future composition 上的 Vert.x 文档,代码 + 注释应该不言而喻。