反应器中基于条件检查的异步顺序调用

Asynchronous sequential calls based on condition checks in reactor

在这里,我尝试使用反应器进行异步和非阻塞调用,对于每个请求,我可能必须依次调用两个服务(在我的例子中,getAccountInfoFromAAAgetAccountInfoFromBBB).

这是我的 ItemRequest 对象:

public class ItemRequest {
    private Account account;
    private Result firstServiceResult;
    private Result secondServiceResult;
    private PostingParameterCode postingParameterCode; //enum 
    //...
    //...
    //getters and setters
}

因此,我的请求输入将包含多个 itemRequest,并且对于每个 itemRequest,我正在进行异步调用:

public void getAccountData(List<ItemRequest> itemRequests) {
    ImmutableList<ItemRequest> list = ImmutableList.copyOf(itemRequests);
    Flux.fromIterable(list).flatMap(this::callBothSors).blockLast();
}

public Mono<ItemRequest> callBothSors(ItemRequest itemRequest) {
    return getAccountDataService.getAccountDataFromAAAandBBB(itemRequest); 
    //here, it will enter into a sequential call for each itemRequest
}

这是我的第一个服务调用界面:

public Mono<ItemRequest> getAccountDataFromAAA(ItemRequest itemRequest);

这是我的第二个服务调用接口:

public Mono<ItemRequest> getAccountDataFromBBB(ItemRequest itemRequest);

此方法将根据条件依次调用最多两次:

public Mono<ItemRequest> getAccountDataFromAAAandBBB(ItemRequest itemRequest){
    Mono<ItemRequest> firstCallResult = Mono.empty();
    Mono<ItemRequest> secondCallResult = Mono.empty();

if(isFirstServiceCallRequired(itemRequest)){
    firstCallResult = this.firstServiceCallImpl.getAccountDataFromAAA(itemRequest); 
//basically, firstService call will update the accountKey information and
//will also set the result status to OK which is required to decide 
//whether to make secondService call.
} else {
    //Account key is already present, so just update the result status which I need later.
    Result result = new Result();
    result.setStatus(Result.Status.OK);
    result.setMessageText("First call not required as account info is set for item request");
    itemRequest.setFirstServiceResult(result);
}

//Now, before calling the second service, I need to check the following:

if(null!= itemRequest.getFirstServiceResult() && 
    itemRequest.getFirstServiceResult().getStatus().equals(Result.Status.OK) && 
    itemRequest.getPostingParameterCode().equals(PostingParameterCode.MOBILECREDIT)){ 
        secondCallResult = this.secondServiceCallImpl.getAccountDataFromBBB(itemRequest);
    }

    return firstCallResult.then(secondCallResult);  //attaching the
    //firstCallResult and secondCallResult to produce a single Mono

}

这在不需要 firstCallResult 时工作正常。但是当需要第一次调用时,这个条件检查不会通过,因为我不会更新第一次调用结果对象:

if(null != itemRequest.getFirstServiceResult() && 
    itemRequest.getFirstServiceResult().getStatus().equals(Result.Status.OK) && 
    itemRequest.getPostingParameterCode().equals(PostingParameterCode.MOBILECREDIT))) { ... } 
 //this condition check will not pass because first service call is not actually executing

如果我输入以下语句,这两种情况都可以正常工作:

if(isFirstServiceCallRequired(itemRequest)){
        firstCallResult = this.firstServiceCallImpl.getAccountDataFromAAA(itemRequest); 
        firstCallResult.block(); //adding this case will work on both cases 
    }

但是,我认为我不会以这种方式让反应堆受益。 我想有这样的逻辑:

Mono<ItemRequest> result = firstService.call(...)
    .doOnNext(/*do something */)
    .then( ... secondService.call())

但无法找到将 secondService 与 firstService 链接起来以获得单声道结果​​并进行这些条件检查的方法。 条件检查很重要,因为我并不总是想执行第二个服务。有什么方法可以将 secondService 与 firstService 链接起来以获得结果并进行这些条件检查吗?

很抱歉问了这么长的问题。任何 suggestions/help 将不胜感激。

这里有一些消息:反应堆不是灵丹妙药! :)

每当您需要调用的响应来确定是否需要做其他事情时,这将永远无法完全并行化。例如。你总是可以做你最后的建议。但是,这并不意味着使用 Reactor 不会给您带来任何好处!

您获得的一些好处:

  • 您正在后台使用 Netty 而不是 Servlet,这有助于避免锁定 I/O 操作。这可以更好地分配资源,使您的系统更具弹性。
  • 您可以在等待响应的同时进行其他操作。如果您有事情要做,顺序无关紧要,您可以随时将它们放在那里(例如审计、日志记录等)。

我希望这能回答你的问题:)

在为这个问题提供赏金点之后,我真的很兴奋,期待一些答案。 但无论如何,我能够改进我的初始解决方案并进行这些条件检查。

我做了以下事情: 我在两个服务调用中将 return 类型从 Mono<ItemRequest> 更改为 Mono<Void>,因为我基本上是将数据更新为 ItemRequest list:

这里处理并行调用(每个并行调用都有一个顺序调用):

public void getAccountData(List<ItemRequest> itemRequests) {
        ImmutableList<ItemRequest> list = ImmutableList.copyOf(itemRequests);
        Flux.fromIterable(list).flatMap(this::callBothSors).blockLast();
    }

    public Mono<Void> callBothSors(ItemRequest itemRequest) {
        return getAccountDataService.getAccountDataFromAAAandBBB(itemRequest);
        //here, it will enter into a sequential call for each itemRequest
    }

这些是我的 firstServiceCallsecondServiceCall 界面更改:

public Mono<Void> getAccountDataFromAAA(ItemRequest itemRequest);

public Mono<Void> getAccountDataFromBBB(ItemRequest itemRequest);

并且我将 secondServiceCallfirstServiceCall 链接起来以获得单声道结果​​并进行这些条件检查:

public Mono<Void> getAccountDataFromAAAandBBB(ItemRequest itemRequest){
    Mono<Void> callSequence = Mono.empty();

    if(isFirstServiceCallRequired(itemRequest)){
        callSequence = this.firstServiceCallImpl.getAccountDataFromAAA(itemRequest);
    } else {
        //Account key is already present, so just update the result status which I need later.
        Result result = new Result();
        result.setStatus(Result.Status.OK);
        result.setMessageText("First call not required as account info is set for item request");
        itemRequest.setFirstServiceResult(result);
    }

    return callSequence.thenEmpty(Mono.defer(() -> {
        //note: Mono.defer ==>> Create a Mono provider that will supply a target Mono to subscribe to 
        //for each subscriber downstream.
        //only if the firstServiceCall result is successful & other condition check successful,
        // I am calling secondServiceCall:  
        if(shouldCallSecondService(itemRequest)){
            return this.secondServiceCallImpl.getAccountDataFromAAAandBBB(itemRequest);
        } else {
            return Mono.empty();
        }
    }))
public Mono<ItemRequest> getAccountDataFromAAAandBBB(ItemRequest itemRequest) {
  Mono<ItemRequest> firstCallResult = Mono.empty();
  Mono<ItemRequest> secondCallResult = Mono.empty();

  if (isFirstServiceCallRequired(itemRequest)) {
    firstCallResult = this.firstServiceCallImpl.getAccountDataFromAAA(itemRequest);
    //basically, firstService call will update the accountKey information and
    //will also set the result status to OK which is required to decide
    //whether to make secondService call.
  } else {
  /*Account key is already present, so just update the result status which I need 
  later.*/
    firstCallResult = Mono.defer(() -> {
      Result result = new Result();
      result.setStatus(Result.Status.OK);
      result.setMessageText("First call not required as account info is set for item request");
      itemRequest.setFirstServiceResult(result);
      return Mono.just(itemRequest);
    });
  }

  return firstCallResult.flatMap(itReq -> {
    //Now, before calling the second service, I need to check the following:
    if (null != itemRequest.getFirstServiceResult() &&
        itemRequest.getFirstServiceResult().getStatus().equals(Result.Status.OK) &&
      itemRequest.getPostingParameterCode().equals(PostingParameterCode.MOBILECREDIT)) {
        return secondCallResult = this.secondServiceCallImpl.getAccountDataFromBBB(itemRequest);
  } else {
    return itReq;
  }
  });
}

下一个简单的例子可以帮助你flatMap理解:

public static void main(String[] args) {

  callExternalServiceA.flatMap(response -> {
    if(response.equals("200")){
      return Mono.just(response);
    } else {
      return callExtertnalServiceB();
    }
  }).block();

}

public static Mono<String> callExtertnalServiceA() {
  return Mono.defer(() -> {
    System.out.println("Call external service A");
    return Mono.just("400");
  });
}

public static Mono<String> callExtertnalServiceB() {
  return Mono.defer(() -> {
    System.out.println("Call external service B");
    return Mono.just("200");
  });
}