子请求的 CompletableFuture
CompletableFuture for child requests
我正在尝试理解 Java 8 中的 CompletableFuture。作为其中的一部分,我正在尝试进行一些 REST 调用以巩固我的理解。我正在使用这个库进行 REST 调用:https://github.com/AsyncHttpClient/async-http-client.
请注意,此库 returns 是 GET 调用的响应对象。
以下是我正在尝试做的事情:
- 调用这个 URL 给出用户列表:https://jsonplaceholder.typicode.com/users
- 使用 GSON 将响应转换为用户对象列表。
- 遍历列表中的每个用户对象,获取用户ID,然后从以下URL中获取用户创建的Post列表:https://jsonplaceholder.typicode.com/posts?userId=1
- 使用 GSON 将每个 post 响应转换为 Post 对象。
构建用户Post对象的集合,每个对象都有一个用户对象和用户创建的post列表。
public class UserPosts {
private final User user;
private final List<Post> posts;
public UserPosts(User user, List<Post> posts) {
this.user = user;
this.posts = posts;
}
@Override
public String toString() {
return "user = " + this.user + " \n" + "post = " + posts+ " \n \n";
}
}
我目前的实现方式如下:
package com.CompletableFuture;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.asynchttpclient.Response;
import com.http.HttpResponse;
import com.http.HttpUtil;
import com.model.Post;
import com.model.User;
import com.model.UserPosts;
/**
* Created by vm on 8/20/18.
*/
class UserPostResponse {
private final User user;
private final Future<Response> postResponse;
UserPostResponse(User user, Future<Response> postResponse) {
this.user = user;
this.postResponse = postResponse;
}
public User getUser() {
return user;
}
public Future<Response> getPostResponse() {
return postResponse;
}
}
public class HttpCompletableFuture extends HttpResponse {
private Function<Future<Response>, List<User>> userResponseToObject = user -> {
try {
return super.convertResponseToUser(Optional.of(user.get().getResponseBody())).get();
} catch (Exception e) {
e.printStackTrace();
return null;
}
};
private Function<Future<Response>, List<Post>> postResponseToObject = post -> {
try {
return super.convertResponseToPost(Optional.of(post.get().getResponseBody())).get();
} catch (Exception e) {
e.printStackTrace();
return null;
}
};
private Function<UserPostResponse, UserPosts> buildUserPosts = (userPostResponse) -> {
try {
return new UserPosts(userPostResponse.getUser(), postResponseToObject.apply(userPostResponse.getPostResponse()));
} catch (Exception e) {
e.printStackTrace();
return null;
}
};
private Function<User, UserPostResponse> getPostResponseForUser = user -> {
Future<Response> resp = super.getPostsForUser(user.getId());
return new UserPostResponse(user, resp);
};
public HttpCompletableFuture() {
super(HttpUtil.getInstance());
}
public List<UserPosts> getUserPosts() {
try {
CompletableFuture<List<UserPosts>> usersFuture = CompletableFuture
.supplyAsync(() -> super.getUsers())
.thenApply(userResponseToObject)
.thenApply((List<User> users)-> users.stream().map(getPostResponseForUser).collect(Collectors.toList()))
.thenApply((List<UserPostResponse> userPostResponses ) -> userPostResponses.stream().map(buildUserPosts).collect(Collectors.toList()));
List<UserPosts> users = usersFuture.get();
System.out.println(users);
return users;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
但是,我不确定我这样做的方式是否正确。更具体地说,在 userResponseToObject
和 postResponseToObject
函数中,我在 Future 上调用 get()
方法,这将是阻塞的。
有没有更好的实现方式?
根据您要用于管理阻塞响应的策略,您至少可以探索这些实现:
1) 超时调用class CompletableFuture 的重载方法get:
List<UserPosts> users = usersFuture.get(long timeout, TimeUnit unit);
来自文档:
Waits if necessary for at most the given time for this future to
complete, and then returns its result, if available.
2) 使用替代方法 getNow:
列出用户 = usersFuture.getNow(T valueIfAbsent);
Returns the result value (or throws any encountered exception) if
completed, else returns the given valueIfAbsent.
3) 使用 CompletableFuture 而不是 Future,您可以强制手动解锁 get 调用 complete :
usersFuture.complete("Manual CompletableFuture's Result")
如果您打算使用 CompletableFuture
,您应该使用 async-http-client 库中的 ListenableFuture
。 ListenableFuture
可以转换为 CompletableFuture
.
使用 CompletableFuture
的优点是您可以编写处理 Response
对象的逻辑,而无需了解任何有关 futures 或线程的知识。假设你写了以下 4 个方法。 2 人发出请求,2 人解析响应:
ListenableFuture<Response> requestUsers() {
}
ListenableFuture<Response> requestPosts(User u) {
}
List<User> parseUsers(Response r) {
}
List<UserPost> parseUserPosts(Response r, User u) {
}
现在我们可以编写一个非阻塞方法来检索给定用户的帖子:
CompletableFuture<List<UserPost>> userPosts(User u) {
return requestPosts(u)
.toCompletableFuture()
.thenApply(r -> parseUserPosts(r, u));
}
以及为所有用户阅读所有帖子的阻止方法:
List<UserPost> getAllPosts() {
// issue all requests
List<CompletableFuture<List<UserPost>>> postFutures = requestUsers()
.toCompletableFuture()
.thenApply(userRequest -> parseUsers(userRequest)
.stream()
.map(this::userPosts)
.collect(toList())
).join();
// collect the results
return postFutures.stream()
.map(CompletableFuture::join)
.flatMap(List::stream)
.collect(toList());
}
我正在尝试理解 Java 8 中的 CompletableFuture。作为其中的一部分,我正在尝试进行一些 REST 调用以巩固我的理解。我正在使用这个库进行 REST 调用:https://github.com/AsyncHttpClient/async-http-client.
请注意,此库 returns 是 GET 调用的响应对象。
以下是我正在尝试做的事情:
- 调用这个 URL 给出用户列表:https://jsonplaceholder.typicode.com/users
- 使用 GSON 将响应转换为用户对象列表。
- 遍历列表中的每个用户对象,获取用户ID,然后从以下URL中获取用户创建的Post列表:https://jsonplaceholder.typicode.com/posts?userId=1
- 使用 GSON 将每个 post 响应转换为 Post 对象。
构建用户Post对象的集合,每个对象都有一个用户对象和用户创建的post列表。
public class UserPosts { private final User user; private final List<Post> posts; public UserPosts(User user, List<Post> posts) { this.user = user; this.posts = posts; } @Override public String toString() { return "user = " + this.user + " \n" + "post = " + posts+ " \n \n"; }
}
我目前的实现方式如下:
package com.CompletableFuture;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.asynchttpclient.Response;
import com.http.HttpResponse;
import com.http.HttpUtil;
import com.model.Post;
import com.model.User;
import com.model.UserPosts;
/**
* Created by vm on 8/20/18.
*/
class UserPostResponse {
private final User user;
private final Future<Response> postResponse;
UserPostResponse(User user, Future<Response> postResponse) {
this.user = user;
this.postResponse = postResponse;
}
public User getUser() {
return user;
}
public Future<Response> getPostResponse() {
return postResponse;
}
}
public class HttpCompletableFuture extends HttpResponse {
private Function<Future<Response>, List<User>> userResponseToObject = user -> {
try {
return super.convertResponseToUser(Optional.of(user.get().getResponseBody())).get();
} catch (Exception e) {
e.printStackTrace();
return null;
}
};
private Function<Future<Response>, List<Post>> postResponseToObject = post -> {
try {
return super.convertResponseToPost(Optional.of(post.get().getResponseBody())).get();
} catch (Exception e) {
e.printStackTrace();
return null;
}
};
private Function<UserPostResponse, UserPosts> buildUserPosts = (userPostResponse) -> {
try {
return new UserPosts(userPostResponse.getUser(), postResponseToObject.apply(userPostResponse.getPostResponse()));
} catch (Exception e) {
e.printStackTrace();
return null;
}
};
private Function<User, UserPostResponse> getPostResponseForUser = user -> {
Future<Response> resp = super.getPostsForUser(user.getId());
return new UserPostResponse(user, resp);
};
public HttpCompletableFuture() {
super(HttpUtil.getInstance());
}
public List<UserPosts> getUserPosts() {
try {
CompletableFuture<List<UserPosts>> usersFuture = CompletableFuture
.supplyAsync(() -> super.getUsers())
.thenApply(userResponseToObject)
.thenApply((List<User> users)-> users.stream().map(getPostResponseForUser).collect(Collectors.toList()))
.thenApply((List<UserPostResponse> userPostResponses ) -> userPostResponses.stream().map(buildUserPosts).collect(Collectors.toList()));
List<UserPosts> users = usersFuture.get();
System.out.println(users);
return users;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
但是,我不确定我这样做的方式是否正确。更具体地说,在 userResponseToObject
和 postResponseToObject
函数中,我在 Future 上调用 get()
方法,这将是阻塞的。
有没有更好的实现方式?
根据您要用于管理阻塞响应的策略,您至少可以探索这些实现:
1) 超时调用class CompletableFuture 的重载方法get:
List<UserPosts> users = usersFuture.get(long timeout, TimeUnit unit);
来自文档:
Waits if necessary for at most the given time for this future to complete, and then returns its result, if available.
2) 使用替代方法 getNow:
列出用户 = usersFuture.getNow(T valueIfAbsent);
Returns the result value (or throws any encountered exception) if completed, else returns the given valueIfAbsent.
3) 使用 CompletableFuture 而不是 Future,您可以强制手动解锁 get 调用 complete :
usersFuture.complete("Manual CompletableFuture's Result")
如果您打算使用 CompletableFuture
,您应该使用 async-http-client 库中的 ListenableFuture
。 ListenableFuture
可以转换为 CompletableFuture
.
使用 CompletableFuture
的优点是您可以编写处理 Response
对象的逻辑,而无需了解任何有关 futures 或线程的知识。假设你写了以下 4 个方法。 2 人发出请求,2 人解析响应:
ListenableFuture<Response> requestUsers() {
}
ListenableFuture<Response> requestPosts(User u) {
}
List<User> parseUsers(Response r) {
}
List<UserPost> parseUserPosts(Response r, User u) {
}
现在我们可以编写一个非阻塞方法来检索给定用户的帖子:
CompletableFuture<List<UserPost>> userPosts(User u) {
return requestPosts(u)
.toCompletableFuture()
.thenApply(r -> parseUserPosts(r, u));
}
以及为所有用户阅读所有帖子的阻止方法:
List<UserPost> getAllPosts() {
// issue all requests
List<CompletableFuture<List<UserPost>>> postFutures = requestUsers()
.toCompletableFuture()
.thenApply(userRequest -> parseUsers(userRequest)
.stream()
.map(this::userPosts)
.collect(toList())
).join();
// collect the results
return postFutures.stream()
.map(CompletableFuture::join)
.flatMap(List::stream)
.collect(toList());
}