反应器中基于条件检查的异步顺序调用
Asynchronous sequential calls based on condition checks in reactor
在这里,我尝试使用反应器进行异步和非阻塞调用,对于每个请求,我可能必须依次调用两个服务(在我的例子中,getAccountInfoFromAAA
和 getAccountInfoFromBBB
).
这是我的 ItemRequest
对象:
public class ItemRequest {
private Account account;
private Result firstServiceResult;
private Result secondServiceResult;
private PostingParameterCode postingParameterCode; //enum
//...
//...
//getters and setters
}
因此,我的请求输入将包含多个 itemRequest
,并且对于每个 itemRequest
,我正在进行异步调用:
public void getAccountData(List<ItemRequest> itemRequests) {
ImmutableList<ItemRequest> list = ImmutableList.copyOf(itemRequests);
Flux.fromIterable(list).flatMap(this::callBothSors).blockLast();
}
public Mono<ItemRequest> callBothSors(ItemRequest itemRequest) {
return getAccountDataService.getAccountDataFromAAAandBBB(itemRequest);
//here, it will enter into a sequential call for each itemRequest
}
这是我的第一个服务调用界面:
public Mono<ItemRequest> getAccountDataFromAAA(ItemRequest itemRequest);
这是我的第二个服务调用接口:
public Mono<ItemRequest> getAccountDataFromBBB(ItemRequest itemRequest);
此方法将根据条件依次调用最多两次:
public Mono<ItemRequest> getAccountDataFromAAAandBBB(ItemRequest itemRequest){
Mono<ItemRequest> firstCallResult = Mono.empty();
Mono<ItemRequest> secondCallResult = Mono.empty();
if(isFirstServiceCallRequired(itemRequest)){
firstCallResult = this.firstServiceCallImpl.getAccountDataFromAAA(itemRequest);
//basically, firstService call will update the accountKey information and
//will also set the result status to OK which is required to decide
//whether to make secondService call.
} else {
//Account key is already present, so just update the result status which I need later.
Result result = new Result();
result.setStatus(Result.Status.OK);
result.setMessageText("First call not required as account info is set for item request");
itemRequest.setFirstServiceResult(result);
}
//Now, before calling the second service, I need to check the following:
if(null!= itemRequest.getFirstServiceResult() &&
itemRequest.getFirstServiceResult().getStatus().equals(Result.Status.OK) &&
itemRequest.getPostingParameterCode().equals(PostingParameterCode.MOBILECREDIT)){
secondCallResult = this.secondServiceCallImpl.getAccountDataFromBBB(itemRequest);
}
return firstCallResult.then(secondCallResult); //attaching the
//firstCallResult and secondCallResult to produce a single Mono
}
这在不需要 firstCallResult
时工作正常。但是当需要第一次调用时,这个条件检查不会通过,因为我不会更新第一次调用结果对象:
if(null != itemRequest.getFirstServiceResult() &&
itemRequest.getFirstServiceResult().getStatus().equals(Result.Status.OK) &&
itemRequest.getPostingParameterCode().equals(PostingParameterCode.MOBILECREDIT))) { ... }
//this condition check will not pass because first service call is not actually executing
如果我输入以下语句,这两种情况都可以正常工作:
if(isFirstServiceCallRequired(itemRequest)){
firstCallResult = this.firstServiceCallImpl.getAccountDataFromAAA(itemRequest);
firstCallResult.block(); //adding this case will work on both cases
}
但是,我认为我不会以这种方式让反应堆受益。
我想有这样的逻辑:
Mono<ItemRequest> result = firstService.call(...)
.doOnNext(/*do something */)
.then( ... secondService.call())
但无法找到将 secondService 与 firstService 链接起来以获得单声道结果并进行这些条件检查的方法。
条件检查很重要,因为我并不总是想执行第二个服务。有什么方法可以将 secondService 与 firstService 链接起来以获得结果并进行这些条件检查吗?
很抱歉问了这么长的问题。任何 suggestions/help 将不胜感激。
这里有一些消息:反应堆不是灵丹妙药! :)
每当您需要调用的响应来确定是否需要做其他事情时,这将永远无法完全并行化。例如。你总是可以做你最后的建议。但是,这并不意味着使用 Reactor 不会给您带来任何好处!
您获得的一些好处:
- 您正在后台使用 Netty 而不是 Servlet,这有助于避免锁定 I/O 操作。这可以更好地分配资源,使您的系统更具弹性。
- 您可以在等待响应的同时进行其他操作。如果您有事情要做,顺序无关紧要,您可以随时将它们放在那里(例如审计、日志记录等)。
我希望这能回答你的问题:)
在为这个问题提供赏金点之后,我真的很兴奋,期待一些答案。
但无论如何,我能够改进我的初始解决方案并进行这些条件检查。
我做了以下事情:
我在两个服务调用中将 return 类型从 Mono<ItemRequest>
更改为 Mono<Void>
,因为我基本上是将数据更新为 ItemRequest
list:
这里处理并行调用(每个并行调用都有一个顺序调用):
public void getAccountData(List<ItemRequest> itemRequests) {
ImmutableList<ItemRequest> list = ImmutableList.copyOf(itemRequests);
Flux.fromIterable(list).flatMap(this::callBothSors).blockLast();
}
public Mono<Void> callBothSors(ItemRequest itemRequest) {
return getAccountDataService.getAccountDataFromAAAandBBB(itemRequest);
//here, it will enter into a sequential call for each itemRequest
}
这些是我的 firstServiceCall
和 secondServiceCall
界面更改:
public Mono<Void> getAccountDataFromAAA(ItemRequest itemRequest);
public Mono<Void> getAccountDataFromBBB(ItemRequest itemRequest);
并且我将 secondServiceCall
与 firstServiceCall
链接起来以获得单声道结果并进行这些条件检查:
public Mono<Void> getAccountDataFromAAAandBBB(ItemRequest itemRequest){
Mono<Void> callSequence = Mono.empty();
if(isFirstServiceCallRequired(itemRequest)){
callSequence = this.firstServiceCallImpl.getAccountDataFromAAA(itemRequest);
} else {
//Account key is already present, so just update the result status which I need later.
Result result = new Result();
result.setStatus(Result.Status.OK);
result.setMessageText("First call not required as account info is set for item request");
itemRequest.setFirstServiceResult(result);
}
return callSequence.thenEmpty(Mono.defer(() -> {
//note: Mono.defer ==>> Create a Mono provider that will supply a target Mono to subscribe to
//for each subscriber downstream.
//only if the firstServiceCall result is successful & other condition check successful,
// I am calling secondServiceCall:
if(shouldCallSecondService(itemRequest)){
return this.secondServiceCallImpl.getAccountDataFromAAAandBBB(itemRequest);
} else {
return Mono.empty();
}
}))
public Mono<ItemRequest> getAccountDataFromAAAandBBB(ItemRequest itemRequest) {
Mono<ItemRequest> firstCallResult = Mono.empty();
Mono<ItemRequest> secondCallResult = Mono.empty();
if (isFirstServiceCallRequired(itemRequest)) {
firstCallResult = this.firstServiceCallImpl.getAccountDataFromAAA(itemRequest);
//basically, firstService call will update the accountKey information and
//will also set the result status to OK which is required to decide
//whether to make secondService call.
} else {
/*Account key is already present, so just update the result status which I need
later.*/
firstCallResult = Mono.defer(() -> {
Result result = new Result();
result.setStatus(Result.Status.OK);
result.setMessageText("First call not required as account info is set for item request");
itemRequest.setFirstServiceResult(result);
return Mono.just(itemRequest);
});
}
return firstCallResult.flatMap(itReq -> {
//Now, before calling the second service, I need to check the following:
if (null != itemRequest.getFirstServiceResult() &&
itemRequest.getFirstServiceResult().getStatus().equals(Result.Status.OK) &&
itemRequest.getPostingParameterCode().equals(PostingParameterCode.MOBILECREDIT)) {
return secondCallResult = this.secondServiceCallImpl.getAccountDataFromBBB(itemRequest);
} else {
return itReq;
}
});
}
下一个简单的例子可以帮助你flatMap理解:
public static void main(String[] args) {
callExternalServiceA.flatMap(response -> {
if(response.equals("200")){
return Mono.just(response);
} else {
return callExtertnalServiceB();
}
}).block();
}
public static Mono<String> callExtertnalServiceA() {
return Mono.defer(() -> {
System.out.println("Call external service A");
return Mono.just("400");
});
}
public static Mono<String> callExtertnalServiceB() {
return Mono.defer(() -> {
System.out.println("Call external service B");
return Mono.just("200");
});
}
在这里,我尝试使用反应器进行异步和非阻塞调用,对于每个请求,我可能必须依次调用两个服务(在我的例子中,getAccountInfoFromAAA
和 getAccountInfoFromBBB
).
这是我的 ItemRequest
对象:
public class ItemRequest {
private Account account;
private Result firstServiceResult;
private Result secondServiceResult;
private PostingParameterCode postingParameterCode; //enum
//...
//...
//getters and setters
}
因此,我的请求输入将包含多个 itemRequest
,并且对于每个 itemRequest
,我正在进行异步调用:
public void getAccountData(List<ItemRequest> itemRequests) {
ImmutableList<ItemRequest> list = ImmutableList.copyOf(itemRequests);
Flux.fromIterable(list).flatMap(this::callBothSors).blockLast();
}
public Mono<ItemRequest> callBothSors(ItemRequest itemRequest) {
return getAccountDataService.getAccountDataFromAAAandBBB(itemRequest);
//here, it will enter into a sequential call for each itemRequest
}
这是我的第一个服务调用界面:
public Mono<ItemRequest> getAccountDataFromAAA(ItemRequest itemRequest);
这是我的第二个服务调用接口:
public Mono<ItemRequest> getAccountDataFromBBB(ItemRequest itemRequest);
此方法将根据条件依次调用最多两次:
public Mono<ItemRequest> getAccountDataFromAAAandBBB(ItemRequest itemRequest){
Mono<ItemRequest> firstCallResult = Mono.empty();
Mono<ItemRequest> secondCallResult = Mono.empty();
if(isFirstServiceCallRequired(itemRequest)){
firstCallResult = this.firstServiceCallImpl.getAccountDataFromAAA(itemRequest);
//basically, firstService call will update the accountKey information and
//will also set the result status to OK which is required to decide
//whether to make secondService call.
} else {
//Account key is already present, so just update the result status which I need later.
Result result = new Result();
result.setStatus(Result.Status.OK);
result.setMessageText("First call not required as account info is set for item request");
itemRequest.setFirstServiceResult(result);
}
//Now, before calling the second service, I need to check the following:
if(null!= itemRequest.getFirstServiceResult() &&
itemRequest.getFirstServiceResult().getStatus().equals(Result.Status.OK) &&
itemRequest.getPostingParameterCode().equals(PostingParameterCode.MOBILECREDIT)){
secondCallResult = this.secondServiceCallImpl.getAccountDataFromBBB(itemRequest);
}
return firstCallResult.then(secondCallResult); //attaching the
//firstCallResult and secondCallResult to produce a single Mono
}
这在不需要 firstCallResult
时工作正常。但是当需要第一次调用时,这个条件检查不会通过,因为我不会更新第一次调用结果对象:
if(null != itemRequest.getFirstServiceResult() &&
itemRequest.getFirstServiceResult().getStatus().equals(Result.Status.OK) &&
itemRequest.getPostingParameterCode().equals(PostingParameterCode.MOBILECREDIT))) { ... }
//this condition check will not pass because first service call is not actually executing
如果我输入以下语句,这两种情况都可以正常工作:
if(isFirstServiceCallRequired(itemRequest)){
firstCallResult = this.firstServiceCallImpl.getAccountDataFromAAA(itemRequest);
firstCallResult.block(); //adding this case will work on both cases
}
但是,我认为我不会以这种方式让反应堆受益。 我想有这样的逻辑:
Mono<ItemRequest> result = firstService.call(...)
.doOnNext(/*do something */)
.then( ... secondService.call())
但无法找到将 secondService 与 firstService 链接起来以获得单声道结果并进行这些条件检查的方法。 条件检查很重要,因为我并不总是想执行第二个服务。有什么方法可以将 secondService 与 firstService 链接起来以获得结果并进行这些条件检查吗?
很抱歉问了这么长的问题。任何 suggestions/help 将不胜感激。
这里有一些消息:反应堆不是灵丹妙药! :)
每当您需要调用的响应来确定是否需要做其他事情时,这将永远无法完全并行化。例如。你总是可以做你最后的建议。但是,这并不意味着使用 Reactor 不会给您带来任何好处!
您获得的一些好处:
- 您正在后台使用 Netty 而不是 Servlet,这有助于避免锁定 I/O 操作。这可以更好地分配资源,使您的系统更具弹性。
- 您可以在等待响应的同时进行其他操作。如果您有事情要做,顺序无关紧要,您可以随时将它们放在那里(例如审计、日志记录等)。
我希望这能回答你的问题:)
在为这个问题提供赏金点之后,我真的很兴奋,期待一些答案。 但无论如何,我能够改进我的初始解决方案并进行这些条件检查。
我做了以下事情:
我在两个服务调用中将 return 类型从 Mono<ItemRequest>
更改为 Mono<Void>
,因为我基本上是将数据更新为 ItemRequest
list:
这里处理并行调用(每个并行调用都有一个顺序调用):
public void getAccountData(List<ItemRequest> itemRequests) {
ImmutableList<ItemRequest> list = ImmutableList.copyOf(itemRequests);
Flux.fromIterable(list).flatMap(this::callBothSors).blockLast();
}
public Mono<Void> callBothSors(ItemRequest itemRequest) {
return getAccountDataService.getAccountDataFromAAAandBBB(itemRequest);
//here, it will enter into a sequential call for each itemRequest
}
这些是我的 firstServiceCall
和 secondServiceCall
界面更改:
public Mono<Void> getAccountDataFromAAA(ItemRequest itemRequest);
public Mono<Void> getAccountDataFromBBB(ItemRequest itemRequest);
并且我将 secondServiceCall
与 firstServiceCall
链接起来以获得单声道结果并进行这些条件检查:
public Mono<Void> getAccountDataFromAAAandBBB(ItemRequest itemRequest){
Mono<Void> callSequence = Mono.empty();
if(isFirstServiceCallRequired(itemRequest)){
callSequence = this.firstServiceCallImpl.getAccountDataFromAAA(itemRequest);
} else {
//Account key is already present, so just update the result status which I need later.
Result result = new Result();
result.setStatus(Result.Status.OK);
result.setMessageText("First call not required as account info is set for item request");
itemRequest.setFirstServiceResult(result);
}
return callSequence.thenEmpty(Mono.defer(() -> {
//note: Mono.defer ==>> Create a Mono provider that will supply a target Mono to subscribe to
//for each subscriber downstream.
//only if the firstServiceCall result is successful & other condition check successful,
// I am calling secondServiceCall:
if(shouldCallSecondService(itemRequest)){
return this.secondServiceCallImpl.getAccountDataFromAAAandBBB(itemRequest);
} else {
return Mono.empty();
}
}))
public Mono<ItemRequest> getAccountDataFromAAAandBBB(ItemRequest itemRequest) {
Mono<ItemRequest> firstCallResult = Mono.empty();
Mono<ItemRequest> secondCallResult = Mono.empty();
if (isFirstServiceCallRequired(itemRequest)) {
firstCallResult = this.firstServiceCallImpl.getAccountDataFromAAA(itemRequest);
//basically, firstService call will update the accountKey information and
//will also set the result status to OK which is required to decide
//whether to make secondService call.
} else {
/*Account key is already present, so just update the result status which I need
later.*/
firstCallResult = Mono.defer(() -> {
Result result = new Result();
result.setStatus(Result.Status.OK);
result.setMessageText("First call not required as account info is set for item request");
itemRequest.setFirstServiceResult(result);
return Mono.just(itemRequest);
});
}
return firstCallResult.flatMap(itReq -> {
//Now, before calling the second service, I need to check the following:
if (null != itemRequest.getFirstServiceResult() &&
itemRequest.getFirstServiceResult().getStatus().equals(Result.Status.OK) &&
itemRequest.getPostingParameterCode().equals(PostingParameterCode.MOBILECREDIT)) {
return secondCallResult = this.secondServiceCallImpl.getAccountDataFromBBB(itemRequest);
} else {
return itReq;
}
});
}
下一个简单的例子可以帮助你flatMap理解:
public static void main(String[] args) {
callExternalServiceA.flatMap(response -> {
if(response.equals("200")){
return Mono.just(response);
} else {
return callExtertnalServiceB();
}
}).block();
}
public static Mono<String> callExtertnalServiceA() {
return Mono.defer(() -> {
System.out.println("Call external service A");
return Mono.just("400");
});
}
public static Mono<String> callExtertnalServiceB() {
return Mono.defer(() -> {
System.out.println("Call external service B");
return Mono.just("200");
});
}