Reactor Mono - 执行并行任务
Reactor Mono - execute parallel tasks
我是 Reactor 框架的新手,正在尝试在我们现有的实现之一中使用它。 LocationProfileService 和 InventoryService 都是 return 一个 Mono 并且将并行执行并且彼此不依赖(来自 MainService)。在 LocationProfileService 中 - 发出了 4 个查询,最后两个查询依赖于第一个查询。
什么是更好的写法?我看到调用顺序执行,而其中一些应该并行执行。正确的做法是什么?
public class LocationProfileService {
static final Cache<String, String> customerIdCache //define Cache
@Override
public Mono<LocationProfileInfo> getProfileInfoByLocationAndCustomer(String customerId, String location) {
//These 2 are not interdependent and can be executed immediately
Mono<String> customerAccountMono = getCustomerArNumber(customerId,location) LocationNumber).subscribeOn(Schedulers.parallel()).switchIfEmpty(Mono.error(new CustomerNotFoundException(location, customerId))).log();
Mono<LocationProfile> locationProfileMono = Mono.fromFuture(//location query).subscribeOn(Schedulers.parallel()).log();
//Should block be called, or is there a better way to do ?
String custAccount = customerAccountMono.block(); // This is needed to execute and the value from this is needed for the next 2 calls
Mono<Customer> customerMono = Mono.fromFuture(//query uses custAccount from earlier step).subscribeOn(Schedulers.parallel()).log();
Mono<Result<LocationPricing>> locationPricingMono = Mono.fromFuture(//query uses custAccount from earlier step).subscribeOn(Schedulers.parallel()).log();
return Mono.zip(locationProfileMono,customerMono,locationPricingMono).flatMap(tuple -> {
LocationProfileInfo locationProfileInfo = new LocationProfileInfo();
//populate values from tuple
return Mono.just(locationProfileInfo);
});
}
private Mono<String> getCustomerAccount(String conversationId, String customerId, String location) {
return CacheMono.lookup((Map)customerIdCache.asMap(),customerId).onCacheMissResume(Mono.fromFuture(//query).subscribeOn(Schedulers.parallel()).map(x -> x.getAccountNumber()));
}
}
public class InventoryService {
@Override
public Mono<InventoryInfo> getInventoryInfo(String inventoryId) {
Mono<Inventory> inventoryMono = Mono.fromFuture(//inventory query).subscribeOn(Schedulers.parallel()).log();
Mono<List<InventorySale>> isMono = Mono.fromFuture(//inventory sale query).subscribeOn(Schedulers.parallel()).log();
return Mono.zip(inventoryMono,isMono).flatMap(tuple -> {
InventoryInfo inventoryInfo = new InventoryInfo();
//populate value from tuple
return Mono.just(inventoryInfo);
});
}
}
public class MainService {
@Autowired
LocationProfileService locationProfileService;
@Autowired
InventoryService inventoryService
public void mainService(String customerId, String location, String inventoryId) {
Mono<LocationProfileInfo> locationProfileMono = locationProfileService.getProfileInfoByLocationAndCustomer(....);
Mono<InventoryInfo> inventoryMono = inventoryService.getInventoryInfo(....);
//is using block fine or is there a better way to do?
Mono.zip(locationProfileMono,inventoryMono).subscribeOn(Schedulers.parallel()).block();
}
}
您无需阻塞即可获得该参数的传递,您的代码非常接近解决方案。我使用您提供的 class 名称编写了代码。只需将所有 Mono.just(....) 替换为对正确服务的调用即可。
public Mono<LocationProfileInfo> getProfileInfoByLocationAndCustomer(String customerId, String location) {
Mono<String> customerAccountMono = Mono.just("customerAccount");
Mono<LocationProfile> locationProfileMono = Mono.just(new LocationProfile());
return Mono.zip(customerAccountMono, locationProfileMono)
.flatMap(tuple -> {
Mono<Customer> customerMono = Mono.just(new Customer(tuple.getT1()));
Mono<Result<LocationPricing>> result = Mono.just(new Result<LocationPricing>());
Mono<LocationProfile> locationProfile = Mono.just(tuple.getT2());
return Mono.zip(customerMono, result, locationProfile);
})
.map(LocationProfileInfo::new)
;
}
public static class LocationProfileInfo {
public LocationProfileInfo(Tuple3<Customer, Result<LocationPricing>, LocationProfile> tuple){
//do wathever
}
}
public static class LocationProfile {}
private static class Customer {
public Customer(String cutomerAccount) {
}
}
private static class Result<T> {}
private static class LocationPricing {}
请记住,第一个 zip 不是必需的。我重新编写它来处理您的解决方案。但我会以不同的方式解决这个问题。这样会更清楚。
public Mono<LocationProfileInfo> getProfileInfoByLocationAndCustomer(String customerId, String location) {
return Mono.just("customerAccount") //call the service
.flatMap(customerAccount -> {
//declare the call to get the customer
Mono<Customer> customerMono = Mono.just(new Customer(customerAccount));
//declare the call to get the location pricing
Mono<Result<LocationPricing>> result = Mono.just(new Result<LocationPricing>());
//declare the call to get the location profile
Mono<LocationProfile> locationProfileMono = Mono.just(new LocationProfile());
//in the zip call all the services actually are executed
return Mono.zip(customerMono, result, locationProfileMono);
})
.map(LocationProfileInfo::new)
;
}
我是 Reactor 框架的新手,正在尝试在我们现有的实现之一中使用它。 LocationProfileService 和 InventoryService 都是 return 一个 Mono 并且将并行执行并且彼此不依赖(来自 MainService)。在 LocationProfileService 中 - 发出了 4 个查询,最后两个查询依赖于第一个查询。
什么是更好的写法?我看到调用顺序执行,而其中一些应该并行执行。正确的做法是什么?
public class LocationProfileService {
static final Cache<String, String> customerIdCache //define Cache
@Override
public Mono<LocationProfileInfo> getProfileInfoByLocationAndCustomer(String customerId, String location) {
//These 2 are not interdependent and can be executed immediately
Mono<String> customerAccountMono = getCustomerArNumber(customerId,location) LocationNumber).subscribeOn(Schedulers.parallel()).switchIfEmpty(Mono.error(new CustomerNotFoundException(location, customerId))).log();
Mono<LocationProfile> locationProfileMono = Mono.fromFuture(//location query).subscribeOn(Schedulers.parallel()).log();
//Should block be called, or is there a better way to do ?
String custAccount = customerAccountMono.block(); // This is needed to execute and the value from this is needed for the next 2 calls
Mono<Customer> customerMono = Mono.fromFuture(//query uses custAccount from earlier step).subscribeOn(Schedulers.parallel()).log();
Mono<Result<LocationPricing>> locationPricingMono = Mono.fromFuture(//query uses custAccount from earlier step).subscribeOn(Schedulers.parallel()).log();
return Mono.zip(locationProfileMono,customerMono,locationPricingMono).flatMap(tuple -> {
LocationProfileInfo locationProfileInfo = new LocationProfileInfo();
//populate values from tuple
return Mono.just(locationProfileInfo);
});
}
private Mono<String> getCustomerAccount(String conversationId, String customerId, String location) {
return CacheMono.lookup((Map)customerIdCache.asMap(),customerId).onCacheMissResume(Mono.fromFuture(//query).subscribeOn(Schedulers.parallel()).map(x -> x.getAccountNumber()));
}
}
public class InventoryService {
@Override
public Mono<InventoryInfo> getInventoryInfo(String inventoryId) {
Mono<Inventory> inventoryMono = Mono.fromFuture(//inventory query).subscribeOn(Schedulers.parallel()).log();
Mono<List<InventorySale>> isMono = Mono.fromFuture(//inventory sale query).subscribeOn(Schedulers.parallel()).log();
return Mono.zip(inventoryMono,isMono).flatMap(tuple -> {
InventoryInfo inventoryInfo = new InventoryInfo();
//populate value from tuple
return Mono.just(inventoryInfo);
});
}
}
public class MainService {
@Autowired
LocationProfileService locationProfileService;
@Autowired
InventoryService inventoryService
public void mainService(String customerId, String location, String inventoryId) {
Mono<LocationProfileInfo> locationProfileMono = locationProfileService.getProfileInfoByLocationAndCustomer(....);
Mono<InventoryInfo> inventoryMono = inventoryService.getInventoryInfo(....);
//is using block fine or is there a better way to do?
Mono.zip(locationProfileMono,inventoryMono).subscribeOn(Schedulers.parallel()).block();
}
}
您无需阻塞即可获得该参数的传递,您的代码非常接近解决方案。我使用您提供的 class 名称编写了代码。只需将所有 Mono.just(....) 替换为对正确服务的调用即可。
public Mono<LocationProfileInfo> getProfileInfoByLocationAndCustomer(String customerId, String location) {
Mono<String> customerAccountMono = Mono.just("customerAccount");
Mono<LocationProfile> locationProfileMono = Mono.just(new LocationProfile());
return Mono.zip(customerAccountMono, locationProfileMono)
.flatMap(tuple -> {
Mono<Customer> customerMono = Mono.just(new Customer(tuple.getT1()));
Mono<Result<LocationPricing>> result = Mono.just(new Result<LocationPricing>());
Mono<LocationProfile> locationProfile = Mono.just(tuple.getT2());
return Mono.zip(customerMono, result, locationProfile);
})
.map(LocationProfileInfo::new)
;
}
public static class LocationProfileInfo {
public LocationProfileInfo(Tuple3<Customer, Result<LocationPricing>, LocationProfile> tuple){
//do wathever
}
}
public static class LocationProfile {}
private static class Customer {
public Customer(String cutomerAccount) {
}
}
private static class Result<T> {}
private static class LocationPricing {}
请记住,第一个 zip 不是必需的。我重新编写它来处理您的解决方案。但我会以不同的方式解决这个问题。这样会更清楚。
public Mono<LocationProfileInfo> getProfileInfoByLocationAndCustomer(String customerId, String location) {
return Mono.just("customerAccount") //call the service
.flatMap(customerAccount -> {
//declare the call to get the customer
Mono<Customer> customerMono = Mono.just(new Customer(customerAccount));
//declare the call to get the location pricing
Mono<Result<LocationPricing>> result = Mono.just(new Result<LocationPricing>());
//declare the call to get the location profile
Mono<LocationProfile> locationProfileMono = Mono.just(new LocationProfile());
//in the zip call all the services actually are executed
return Mono.zip(customerMono, result, locationProfileMono);
})
.map(LocationProfileInfo::new)
;
}