CompletableFuture - 运行 多个并行的 rest 调用并得到不同的结果
CompletableFuture - Run multiple rest calls in parallel and get different result
我有一个相当普遍或独特的要求。例如,我有以下 AccountDetails
列表:
List<AccountDetails>
class AccountDetails {
String bankAccountId;
String mortgageAccountId;
Integer noOfTrans;
String addressLine;
String externalLink;
}
除了bankAccountId
之外的所有上述字段都是从外部REST服务调用中提取的。
我想并行调用所有 REST 服务并更新列表中的每个对象:
所以,看起来像下面这样:
每个accountDetails
- 调用抵押 REST 服务并更新
martgageAccountId
字段(REST returns MortgageInfo 对象)
- 调用事务REST服务并更新
noOfTrans
字段(RESTreturnsTransactions
对象)
- 调用地址REST服务并更新
addressLine
字段(RESTreturnsAddress
对象)
- 调用 link REST 服务并更新
externalLink
字段。 (REST returns Links
对象)
我希望以上所有并行调用,并且针对列表中的每个 AcccountDetails
对象。
如果有异常,我想优雅地处理它。注意上面每个REST服务returns不同的自定义对象
我对如何使用 CompletableFuture
链接实现这一点感到困惑。
不确定 allOf
或 thenCombine
(只需要两个),或 thenCompose
应该使用以及如何将所有这些放在一起。
任何 examples/ideas?
AccountDetails accountDetails = new AccountDetails();
CompletableFuture.allOf(
CompletableFuture.
supplyAsync(() -> //CALL MORTAGE INFO REST, executor).
thenAccept(x -> {
accountDetails.setMortgageAccountId(x.getReqdField())
}).
handle(//HANDLE GRACEFULLY),
CompletableFuture.
supplyAsync(() -> //CALL SOME OTHER REST, executor).
thenAccept(x -> {
accountDetails.setNoOfTrans(x.getReqdField())
}).
handle(//HANDLE GRACEFULLY),
CompletableFuture.
supplyAsync(() -> //CALL SOME INFO REST, executor).
thenAccept(x -> {
accountDetails.setAddressLine(x.getReqdField())
}).
handle(//HANDLE GRACEFULLY),
CompletableFuture.
supplyAsync(() -> //CALL SOME OTHER REST, executor).
thenAccept(x -> {
accountDetails.setExternalLink(x.getReqdField())
}).
handle(//HANDLE GRACEFULLY),
).join();
既然你已经标记了 spring-boot
我想你会使用它并且你的服务是用 spring 框架编写的。所以我提供了一个与 spring 框架相关的答案。
首先,我创建了一个接口,用于将 rest API 实现为异步。
public interface AsyncRestCall<T> {
/** this is a hypothetical method with hypothetical params!*/
CompletableFuture<T> call(String bankAccountId);
String type();
}
然后你可以这样实现你的服务:
正如你在这个实现中看到的,我使用 MortgageRest
它代表 Mortgage
的休息服务。
@Service
public class MortgageService implements AsyncRestCall<MortgageInfo> {
private final MortgageRest mortgageRest;
@Autowired
public MortgageService(MortgageRest mortgageRest) {
this.mortgageRest = mortgageRest;
}
@Override
public CompletableFuture<MortgageInfo> call(String bankAccountId) {
return CompletableFuture.supplyAsync(() -> mortgageRest.service(bankAccountId));
}
@Override
public String type() {
return "mortgage";
}
}
按揭休息:
@Service
public class MortgageRest {
private RestTemplate restTemplate;
public MortgageRest(RestTemplate restTemplate) {
this.restTemplate = restTemplate;
}
public MortgageInfo service(String bankAccountId) {
return new MortgageInfo("123455" + bankAccountId);
}
}
对于其他休息服务,请这样做。
@Service
public class TransactionService implements AsyncRestCall<Transactions> {
private final TransactionRest transactionRest;
public TransactionService(TransactionRest transactionRest) {
this.transactionRest = transactionRest;
}
@Override
public CompletableFuture<Transactions> call(String bankAccountId) {
return CompletableFuture.supplyAsync(transactionRest::service);
}
@Override
public String type() {
return "transactions";
}
}
交易休息:
@Service
public class TransactionRest {
public Transactions service() {
return new Transactions(12);
}
}
现在您需要访问所有 AsyncRestCall
实施。对于这个 porpuse 你可以声明一个 class 像这样的东西:
@Service
public class RestCallHolder {
private final List<AsyncRestCall> asyncRestCalls;
public RestCallHolder(List<AsyncRestCall> asyncRestCalls) {
this.asyncRestCalls = asyncRestCalls;
}
public List<AsyncRestCall> getAsyncRestCalls() {
return asyncRestCalls;
}
}
AccountDetailService
(你可以命名你喜欢的东西)使用 CompleteableFuture
并行调用其余服务。
在此服务中,每个 bankAccountId
休息调用将存储在 Map<String, Map<String, Object>> result = new HashMap<>();
中,外部映射键将存储 bankAccountId
值作为键,其值是他们将存储的休息服务调用在地图(内部地图)中。键是类型,值是休息呼叫响应。最后通过遍历 accountDetails 将更新其属性。
@Service
public class AccountDetailService {
private final RestCallHolder restCallHolder;
public AccountDetailService(RestCallHolder restCallHolder) {
this.restCallHolder = restCallHolder;
}
public List<AccountDetail> update(List<AccountDetail> accountDetails) {
Map<String, Map<String, Object>> result = new HashMap<>();
List<AccountDetail> finalAccountDetails = new ArrayList<>();
accountDetails.forEach(accountDetail -> {
List<CompletableFuture> futures = restCallHolder.getAsyncRestCalls()
.stream()
.map(rest -> rest.call(accountDetail.getBankAccountId()))
.collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]))
.thenAccept(aVoid -> {
Map<String, Object> res = restCallHolder.getAsyncRestCalls()
.stream()
.map(rest -> new AbstractMap.SimpleEntry<>(rest.type(),
rest.call(accountDetail.getBankAccountId()).join()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
result.put(accountDetail.getBankAccountId(), res);
}
).handle((aVoid, throwable) -> {
return null; // handle the exception here
}).join();
}
);
accountDetails.forEach(accountDetail -> finalAccountDetails.add(AccountDetail.builder()
.bankAccountId(accountDetail.getBankAccountId())
.mortgageAccountId(((MortgageInfo) result.get(accountDetail.getBankAccountId()).get("mortgage")).getMortgageAccountId())
.noOfTrans(((Transactions) result.get(accountDetail.getBankAccountId()).get("transactions")).getNoOfTrans())
.build()));
return finalAccountDetails;
}
}
我会将获取字段值的责任交给模型对象本身。
这里有三种替代解决方案,使用并行流、流和执行器,以及 for 循环和执行器。
解决方案 1:
accounts.parallelStream()
.<Runnable>flatMap(account -> Stream.of(account::updateMortgage, account::updateNoOfTrans,
account::updateAddressLine, account::updateExternalLink))
.map(RestRequest::new)
.forEach(RestRequest::run);
解决方案 2:
Executor executor = Executors.newFixedThreadPool(PARALLELISM);
accounts.stream()
.<Runnable>flatMap(account -> Stream.of(account::updateMortgage, account::updateNoOfTrans,
account::updateAddressLine, account::updateExternalLink))
.map(RestRequest::new)
.forEach(executor::execute);
解决方案 3:
Executor executor = Executors.newFixedThreadPool(PARALLELISM);
for (AccountDetails account : accounts) {
execute(executor, account::updateMortgage);
execute(executor, account::updateNoOfTrans);
execute(executor, account::updateAddressLine);
execute(executor, account::updateExternalLink);
}
private static void execute(Executor executor, Runnable task) {
executor.execute(new RestRequest(task));
}
常用代码:
class RestRequest implements Runnable {
private final Runnable task;
RestRequest(Runnable task) {
this.task = task;
}
@Override
public void run() {
try {
task.run();
} catch (Exception e) {
// A request failed. Others will not be canceled.
}
}
}
class AccountDetails {
String bankAccountId;
String mortgageAccountId;
Integer noOfTrans;
String addressLine;
String externalLink;
void fetchMortgage() {
mortgageAccountId = MortgageService.getMortgage(bankAccountId).getAccountId();
}
void fetchNoOfTrans() {
noOfTrans = TransactionService.getTransactions(bankAccountId).getAmount();
}
void fetchAddressLine() {
addressLine = AddressService.getAddress(bankAccountId).getLine();
}
void fetchExternalLink() {
externalLink = LinkService.getLinks(bankAccountId).getExternal();
}
}
如果我简单点一下你的帐号class为:
class Account {
String fieldA;
String fieldB;
String fieldC;
Account(String fieldA, String fieldB, String fieldC) {
this.fieldA = fieldA;
this.fieldB = fieldB;
this.fieldC = fieldC;
}
}
然后您可以使用 CompletableFuture#allOf(...)
等待所有可完成的未来的结果,每个字段更新一个,然后分别从这些未来中检索结果。我们不能使用 allOf
的结果,因为它 returns 什么都没有(无效)。
Account account = CompletableFuture.allOf(cfA, cfB, cfC)
.thenApply(ignored -> {
String a = cfA.join();
String b = cfB.join();
String c = cfC.join();
return new Account(a, b, c);
}).join(); // or get(...) with timeout
我们可以在 thenApply
中使用 join,因为所有可完成的期货都在这个阶段完成。您可以修改上面的代码块以适应您的逻辑,例如更新字段而不是创建新对象。请注意,当可完成的未来异常完成时,上面的 join()
会引发异常。您可以在将未来提交给 allOf(...)
之前将可完成的未来更改为 handle()
,或者在使用 join()
:
之前询问它是否 isCompletedExceptionally()
CompletableFuture.allOf(cfA, cfB, cfC)
.thenRun(() -> {
if (!cfA.isCompletedExceptionally()) {
account.fieldA = cfA.join();
}
if (!cfB.isCompletedExceptionally()) {
account.fieldB = cfB.join();
}
if (!cfC.isCompletedExceptionally()) {
account.fieldC = cfC.join();
}
}).join(); // or get(...) with timeout
在一个完成阶段更新字段的好处是这些操作在同一个线程中完成,因此您不必担心并发修改。
我有一个相当普遍或独特的要求。例如,我有以下 AccountDetails
列表:
List<AccountDetails>
class AccountDetails {
String bankAccountId;
String mortgageAccountId;
Integer noOfTrans;
String addressLine;
String externalLink;
}
除了bankAccountId
之外的所有上述字段都是从外部REST服务调用中提取的。
我想并行调用所有 REST 服务并更新列表中的每个对象:
所以,看起来像下面这样:
每个accountDetails
- 调用抵押 REST 服务并更新
martgageAccountId
字段(REST returns MortgageInfo 对象) - 调用事务REST服务并更新
noOfTrans
字段(RESTreturnsTransactions
对象) - 调用地址REST服务并更新
addressLine
字段(RESTreturnsAddress
对象) - 调用 link REST 服务并更新
externalLink
字段。 (REST returnsLinks
对象)
我希望以上所有并行调用,并且针对列表中的每个 AcccountDetails
对象。
如果有异常,我想优雅地处理它。注意上面每个REST服务returns不同的自定义对象
我对如何使用 CompletableFuture
链接实现这一点感到困惑。
不确定 allOf
或 thenCombine
(只需要两个),或 thenCompose
应该使用以及如何将所有这些放在一起。
任何 examples/ideas?
AccountDetails accountDetails = new AccountDetails();
CompletableFuture.allOf(
CompletableFuture.
supplyAsync(() -> //CALL MORTAGE INFO REST, executor).
thenAccept(x -> {
accountDetails.setMortgageAccountId(x.getReqdField())
}).
handle(//HANDLE GRACEFULLY),
CompletableFuture.
supplyAsync(() -> //CALL SOME OTHER REST, executor).
thenAccept(x -> {
accountDetails.setNoOfTrans(x.getReqdField())
}).
handle(//HANDLE GRACEFULLY),
CompletableFuture.
supplyAsync(() -> //CALL SOME INFO REST, executor).
thenAccept(x -> {
accountDetails.setAddressLine(x.getReqdField())
}).
handle(//HANDLE GRACEFULLY),
CompletableFuture.
supplyAsync(() -> //CALL SOME OTHER REST, executor).
thenAccept(x -> {
accountDetails.setExternalLink(x.getReqdField())
}).
handle(//HANDLE GRACEFULLY),
).join();
既然你已经标记了 spring-boot
我想你会使用它并且你的服务是用 spring 框架编写的。所以我提供了一个与 spring 框架相关的答案。
首先,我创建了一个接口,用于将 rest API 实现为异步。
public interface AsyncRestCall<T> {
/** this is a hypothetical method with hypothetical params!*/
CompletableFuture<T> call(String bankAccountId);
String type();
}
然后你可以这样实现你的服务:
正如你在这个实现中看到的,我使用 MortgageRest
它代表 Mortgage
的休息服务。
@Service
public class MortgageService implements AsyncRestCall<MortgageInfo> {
private final MortgageRest mortgageRest;
@Autowired
public MortgageService(MortgageRest mortgageRest) {
this.mortgageRest = mortgageRest;
}
@Override
public CompletableFuture<MortgageInfo> call(String bankAccountId) {
return CompletableFuture.supplyAsync(() -> mortgageRest.service(bankAccountId));
}
@Override
public String type() {
return "mortgage";
}
}
按揭休息:
@Service
public class MortgageRest {
private RestTemplate restTemplate;
public MortgageRest(RestTemplate restTemplate) {
this.restTemplate = restTemplate;
}
public MortgageInfo service(String bankAccountId) {
return new MortgageInfo("123455" + bankAccountId);
}
}
对于其他休息服务,请这样做。
@Service
public class TransactionService implements AsyncRestCall<Transactions> {
private final TransactionRest transactionRest;
public TransactionService(TransactionRest transactionRest) {
this.transactionRest = transactionRest;
}
@Override
public CompletableFuture<Transactions> call(String bankAccountId) {
return CompletableFuture.supplyAsync(transactionRest::service);
}
@Override
public String type() {
return "transactions";
}
}
交易休息:
@Service
public class TransactionRest {
public Transactions service() {
return new Transactions(12);
}
}
现在您需要访问所有 AsyncRestCall
实施。对于这个 porpuse 你可以声明一个 class 像这样的东西:
@Service
public class RestCallHolder {
private final List<AsyncRestCall> asyncRestCalls;
public RestCallHolder(List<AsyncRestCall> asyncRestCalls) {
this.asyncRestCalls = asyncRestCalls;
}
public List<AsyncRestCall> getAsyncRestCalls() {
return asyncRestCalls;
}
}
AccountDetailService
(你可以命名你喜欢的东西)使用 CompleteableFuture
并行调用其余服务。
在此服务中,每个 bankAccountId
休息调用将存储在 Map<String, Map<String, Object>> result = new HashMap<>();
中,外部映射键将存储 bankAccountId
值作为键,其值是他们将存储的休息服务调用在地图(内部地图)中。键是类型,值是休息呼叫响应。最后通过遍历 accountDetails 将更新其属性。
@Service
public class AccountDetailService {
private final RestCallHolder restCallHolder;
public AccountDetailService(RestCallHolder restCallHolder) {
this.restCallHolder = restCallHolder;
}
public List<AccountDetail> update(List<AccountDetail> accountDetails) {
Map<String, Map<String, Object>> result = new HashMap<>();
List<AccountDetail> finalAccountDetails = new ArrayList<>();
accountDetails.forEach(accountDetail -> {
List<CompletableFuture> futures = restCallHolder.getAsyncRestCalls()
.stream()
.map(rest -> rest.call(accountDetail.getBankAccountId()))
.collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]))
.thenAccept(aVoid -> {
Map<String, Object> res = restCallHolder.getAsyncRestCalls()
.stream()
.map(rest -> new AbstractMap.SimpleEntry<>(rest.type(),
rest.call(accountDetail.getBankAccountId()).join()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
result.put(accountDetail.getBankAccountId(), res);
}
).handle((aVoid, throwable) -> {
return null; // handle the exception here
}).join();
}
);
accountDetails.forEach(accountDetail -> finalAccountDetails.add(AccountDetail.builder()
.bankAccountId(accountDetail.getBankAccountId())
.mortgageAccountId(((MortgageInfo) result.get(accountDetail.getBankAccountId()).get("mortgage")).getMortgageAccountId())
.noOfTrans(((Transactions) result.get(accountDetail.getBankAccountId()).get("transactions")).getNoOfTrans())
.build()));
return finalAccountDetails;
}
}
我会将获取字段值的责任交给模型对象本身。
这里有三种替代解决方案,使用并行流、流和执行器,以及 for 循环和执行器。
解决方案 1:
accounts.parallelStream()
.<Runnable>flatMap(account -> Stream.of(account::updateMortgage, account::updateNoOfTrans,
account::updateAddressLine, account::updateExternalLink))
.map(RestRequest::new)
.forEach(RestRequest::run);
解决方案 2:
Executor executor = Executors.newFixedThreadPool(PARALLELISM);
accounts.stream()
.<Runnable>flatMap(account -> Stream.of(account::updateMortgage, account::updateNoOfTrans,
account::updateAddressLine, account::updateExternalLink))
.map(RestRequest::new)
.forEach(executor::execute);
解决方案 3:
Executor executor = Executors.newFixedThreadPool(PARALLELISM);
for (AccountDetails account : accounts) {
execute(executor, account::updateMortgage);
execute(executor, account::updateNoOfTrans);
execute(executor, account::updateAddressLine);
execute(executor, account::updateExternalLink);
}
private static void execute(Executor executor, Runnable task) {
executor.execute(new RestRequest(task));
}
常用代码:
class RestRequest implements Runnable {
private final Runnable task;
RestRequest(Runnable task) {
this.task = task;
}
@Override
public void run() {
try {
task.run();
} catch (Exception e) {
// A request failed. Others will not be canceled.
}
}
}
class AccountDetails {
String bankAccountId;
String mortgageAccountId;
Integer noOfTrans;
String addressLine;
String externalLink;
void fetchMortgage() {
mortgageAccountId = MortgageService.getMortgage(bankAccountId).getAccountId();
}
void fetchNoOfTrans() {
noOfTrans = TransactionService.getTransactions(bankAccountId).getAmount();
}
void fetchAddressLine() {
addressLine = AddressService.getAddress(bankAccountId).getLine();
}
void fetchExternalLink() {
externalLink = LinkService.getLinks(bankAccountId).getExternal();
}
}
如果我简单点一下你的帐号class为:
class Account {
String fieldA;
String fieldB;
String fieldC;
Account(String fieldA, String fieldB, String fieldC) {
this.fieldA = fieldA;
this.fieldB = fieldB;
this.fieldC = fieldC;
}
}
然后您可以使用 CompletableFuture#allOf(...)
等待所有可完成的未来的结果,每个字段更新一个,然后分别从这些未来中检索结果。我们不能使用 allOf
的结果,因为它 returns 什么都没有(无效)。
Account account = CompletableFuture.allOf(cfA, cfB, cfC)
.thenApply(ignored -> {
String a = cfA.join();
String b = cfB.join();
String c = cfC.join();
return new Account(a, b, c);
}).join(); // or get(...) with timeout
我们可以在 thenApply
中使用 join,因为所有可完成的期货都在这个阶段完成。您可以修改上面的代码块以适应您的逻辑,例如更新字段而不是创建新对象。请注意,当可完成的未来异常完成时,上面的 join()
会引发异常。您可以在将未来提交给 allOf(...)
之前将可完成的未来更改为 handle()
,或者在使用 join()
:
isCompletedExceptionally()
CompletableFuture.allOf(cfA, cfB, cfC)
.thenRun(() -> {
if (!cfA.isCompletedExceptionally()) {
account.fieldA = cfA.join();
}
if (!cfB.isCompletedExceptionally()) {
account.fieldB = cfB.join();
}
if (!cfC.isCompletedExceptionally()) {
account.fieldC = cfC.join();
}
}).join(); // or get(...) with timeout
在一个完成阶段更新字段的好处是这些操作在同一个线程中完成,因此您不必担心并发修改。