CompletableFuture - 运行 多个并行的 rest 调用并得到不同的结果

CompletableFuture - Run multiple rest calls in parallel and get different result

我有一个相当普遍或独特的要求。例如,我有以下 AccountDetails 列表:

List<AccountDetails>

class AccountDetails {
    String bankAccountId;
    String mortgageAccountId;
    Integer noOfTrans;
    String addressLine;
    String externalLink;   
}

除了bankAccountId之外的所有上述字段都是从外部REST服务调用中提取的。 我想并行调用所有 REST 服务并更新列表中的每个对象:

所以,看起来像下面这样:

每个accountDetails

我希望以上所有并行调用,并且针对列表中的每个 AcccountDetails 对象。 如果有异常,我想优雅地处理它。注意上面每个REST服务returns不同的自定义对象

我对如何使用 CompletableFuture 链接实现这一点感到困惑。 不确定 allOfthenCombine(只需要两个),或 thenCompose 应该使用以及如何将所有这些放在一起。

任何 examples/ideas?

AccountDetails accountDetails = new AccountDetails();

CompletableFuture.allOf(
                        CompletableFuture.
                                supplyAsync(() -> //CALL MORTAGE INFO REST, executor).
                                thenAccept(x -> {
                                    accountDetails.setMortgageAccountId(x.getReqdField())
                                }).
                                handle(//HANDLE GRACEFULLY),
                        CompletableFuture.
                                supplyAsync(() -> //CALL SOME OTHER REST, executor).
                                thenAccept(x -> {
                                    accountDetails.setNoOfTrans(x.getReqdField())
                                }).
                                handle(//HANDLE GRACEFULLY),
                        CompletableFuture.
                                supplyAsync(() -> //CALL SOME INFO REST, executor).
                                thenAccept(x -> {
                                    accountDetails.setAddressLine(x.getReqdField())
                                }).
                                handle(//HANDLE GRACEFULLY),
                        CompletableFuture.
                                supplyAsync(() -> //CALL SOME OTHER REST, executor).
                                thenAccept(x -> {
                                    accountDetails.setExternalLink(x.getReqdField())
                                }).
                                handle(//HANDLE GRACEFULLY),
                ).join();

既然你已经标记了 spring-boot 我想你会使用它并且你的服务是用 spring 框架编写的。所以我提供了一个与 spring 框架相关的答案。

首先,我创建了一个接口,用于将 rest API 实现为异步。

public interface AsyncRestCall<T> {
   /** this is a hypothetical method with hypothetical params!*/
   CompletableFuture<T> call(String bankAccountId); 
   String type();
}

然后你可以这样实现你的服务:

正如你在这个实现中看到的,我使用 MortgageRest 它代表 Mortgage 的休息服务。

 @Service
 public class MortgageService implements AsyncRestCall<MortgageInfo> {

   private final MortgageRest mortgageRest;

   @Autowired
   public MortgageService(MortgageRest mortgageRest) {
       this.mortgageRest = mortgageRest;
   }

   @Override
   public CompletableFuture<MortgageInfo> call(String bankAccountId) {
       return CompletableFuture.supplyAsync(() -> mortgageRest.service(bankAccountId));
    }

   @Override
   public String type() {
      return "mortgage";
   } 
} 

按揭休息:

@Service
public class MortgageRest {
  private RestTemplate restTemplate;
  public MortgageRest(RestTemplate restTemplate) {
     this.restTemplate = restTemplate;
  }
  public MortgageInfo service(String bankAccountId) {
     return new MortgageInfo("123455" + bankAccountId);
  }
}

对于其他休息服务,请这样做。

@Service
public class TransactionService implements AsyncRestCall<Transactions> {

   private final TransactionRest transactionRest;

   public TransactionService(TransactionRest transactionRest) {
      this.transactionRest = transactionRest;
   } 

   @Override
   public CompletableFuture<Transactions> call(String bankAccountId) {
       return CompletableFuture.supplyAsync(transactionRest::service);
   }

   @Override
   public String type() {
       return "transactions";
   } 
} 

交易休息:

 @Service
 public class TransactionRest {

   public Transactions service() {
       return new Transactions(12);
   }
 }

现在您需要访问所有 AsyncRestCall 实施。对于这个 porpuse 你可以声明一个 class 像这样的东西:

@Service
public class RestCallHolder {

  private final List<AsyncRestCall> asyncRestCalls;

  public RestCallHolder(List<AsyncRestCall> asyncRestCalls) {
      this.asyncRestCalls = asyncRestCalls;
  }

  public List<AsyncRestCall> getAsyncRestCalls() {
      return asyncRestCalls;
  }
}

AccountDetailService(你可以命名你喜欢的东西)使用 CompleteableFuture 并行调用其余服务。

在此服务中,每个 bankAccountId 休息调用将存储在 Map<String, Map<String, Object>> result = new HashMap<>(); 中,外部映射键将存储 bankAccountId 值作为键,其值是他们将存储的休息服务调用在地图(内部地图)中。键是类型,值是休息呼叫响应。最后通过遍历 accountDetails 将更新其属性。

@Service
public class AccountDetailService {

  private final RestCallHolder restCallHolder;

  public AccountDetailService(RestCallHolder restCallHolder) {
      this.restCallHolder = restCallHolder;
  }

  public List<AccountDetail> update(List<AccountDetail> accountDetails) {
     Map<String, Map<String, Object>> result = new HashMap<>();
     List<AccountDetail> finalAccountDetails = new ArrayList<>();

     accountDetails.forEach(accountDetail -> {
          List<CompletableFuture> futures = restCallHolder.getAsyncRestCalls()
                    .stream()
                    .map(rest -> rest.call(accountDetail.getBankAccountId()))
                    .collect(Collectors.toList());

     CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]))
                 .thenAccept(aVoid -> { 
                    Map<String, Object> res = restCallHolder.getAsyncRestCalls()
                              .stream()
                              .map(rest -> new AbstractMap.SimpleEntry<>(rest.type(),
                                  rest.call(accountDetail.getBankAccountId()).join()))
                              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
                           result.put(accountDetail.getBankAccountId(), res);
                      }
                   ).handle((aVoid, throwable) -> {
                      return null; // handle the exception here 
             }).join();
            }
    );

      accountDetails.forEach(accountDetail -> finalAccountDetails.add(AccountDetail.builder()
             .bankAccountId(accountDetail.getBankAccountId())
             .mortgageAccountId(((MortgageInfo) result.get(accountDetail.getBankAccountId()).get("mortgage")).getMortgageAccountId())
             .noOfTrans(((Transactions) result.get(accountDetail.getBankAccountId()).get("transactions")).getNoOfTrans())
             .build()));
     return finalAccountDetails;
   }
 }

我会将获取字段值的责任交给模型对象本身。
这里有三种替代解决方案,使用并行流、流和执行器,以及 for 循环和执行器。

解决方案 1:

accounts.parallelStream()
        .<Runnable>flatMap(account -> Stream.of(account::updateMortgage, account::updateNoOfTrans,
                account::updateAddressLine, account::updateExternalLink))
        .map(RestRequest::new)
        .forEach(RestRequest::run);

解决方案 2:

Executor executor = Executors.newFixedThreadPool(PARALLELISM);
accounts.stream()
        .<Runnable>flatMap(account -> Stream.of(account::updateMortgage, account::updateNoOfTrans,
                account::updateAddressLine, account::updateExternalLink))
        .map(RestRequest::new)
        .forEach(executor::execute);

解决方案 3:

Executor executor = Executors.newFixedThreadPool(PARALLELISM);
for (AccountDetails account : accounts) {
    execute(executor, account::updateMortgage);
    execute(executor, account::updateNoOfTrans);
    execute(executor, account::updateAddressLine);
    execute(executor, account::updateExternalLink);
}

private static void execute(Executor executor, Runnable task) {
    executor.execute(new RestRequest(task));
}

常用代码:

class RestRequest implements Runnable {
    private final Runnable task;

    RestRequest(Runnable task) {
        this.task = task;
    }

    @Override
    public void run() {
        try {
            task.run();
        } catch (Exception e) {
            // A request failed. Others will not be canceled.
        }
    }
}

class AccountDetails {
    String bankAccountId;
    String mortgageAccountId;
    Integer noOfTrans;
    String addressLine;
    String externalLink;

    void fetchMortgage() {
        mortgageAccountId = MortgageService.getMortgage(bankAccountId).getAccountId();
    }

    void fetchNoOfTrans() {
        noOfTrans = TransactionService.getTransactions(bankAccountId).getAmount();
    }

    void fetchAddressLine() {
        addressLine = AddressService.getAddress(bankAccountId).getLine();
    }

    void fetchExternalLink() {
        externalLink = LinkService.getLinks(bankAccountId).getExternal();
    }
}

如果我简单点一下你的帐号class为:

class Account {
  String fieldA;
  String fieldB;
  String fieldC;

  Account(String fieldA, String fieldB, String fieldC) {
    this.fieldA = fieldA;
    this.fieldB = fieldB;
    this.fieldC = fieldC;
  }
}

然后您可以使用 CompletableFuture#allOf(...) 等待所有可完成的未来的结果,每个字段更新一个,然后分别从这些未来中检索结果。我们不能使用 allOf 的结果,因为它 returns 什么都没有(无效)。

Account account = CompletableFuture.allOf(cfA, cfB, cfC)
    .thenApply(ignored -> {
      String a = cfA.join();
      String b = cfB.join();
      String c = cfC.join();
      return new Account(a, b, c);
    }).join(); // or get(...) with timeout

我们可以在 thenApply 中使用 join,因为所有可完成的期货都在这个阶段完成。您可以修改上面的代码块以适应您的逻辑,例如更新字段而不是创建新对象。请注意,当可完成的未来异常完成时,上面的 join() 会引发异常。您可以在将未来提交给 allOf(...) 之前将可完成的未来更改为 handle(),或者在使用 join():

之前询问它是否 isCompletedExceptionally()
CompletableFuture.allOf(cfA, cfB, cfC)
    .thenRun(() -> {
      if (!cfA.isCompletedExceptionally()) {
        account.fieldA = cfA.join();
      }
      if (!cfB.isCompletedExceptionally()) {
        account.fieldB = cfB.join();
      }
      if (!cfC.isCompletedExceptionally()) {
        account.fieldC = cfC.join();
      }
    }).join(); // or get(...) with timeout

在一个完成阶段更新字段的好处是这些操作在同一个线程中完成,因此您不必担心并发修改。