如何并行执行 Hystrix 命令?
How to execute Hystrix commands in parallel?
我的应用程序中 Spring 服务的一个方法向其他两个微服务发出请求。我想使用 Hystrix 发出这些请求以使其具有容错能力,并且我想 运行 它们并行。
到目前为止,我为每次调用都实现了 HystrixObservableCommand,并使用 CountDownLatch 等待两个命令完成(或失败)。
当前的解决方案看起来非常冗长。是否可以使用 Observable 特性并行执行 Hystrix 命令?
所需的解决方案在伪代码中如下所示:
new LoadCustomerObservableCommand(customerClient, customerId).toObservable()
.doOnError(throwable -> log.error("Failed to retrieve customer {} information for the reservation {}", customerId, reservationId, throwable))
.doOnNext(customer -> myResponse.setCustomer(customer));
new GetTicketsObservableCommand(ticketsClient, reservationId).toObservable()
.doOnError(throwable -> log.error("Failed to retrieve tickets for the reservation {}", reservationId, throwable))
.doOnNext(tickets -> myResponse.setTickets(tickets));
final AtomicBoolean subRequestsFailed = new AtomicBoolean(false);
Observable.zip(customerObservable, ticketsObservable, (customer, tickets) -> null)
.doOnError(throwable -> subRequestsFailed.set(true))
.toBlocking()
.first();
if (subRequestsFailed.get()) {
throw new HystrixBadRequestException("One or more requests to submodules have been failed");
}
return dto;
不幸的是,这个所需的解决方案不起作用,因为 Hystrix 命令从未执行过。
我目前的解决方案是:
// execute requests to sub modules in parallel
final CountDownLatch cdl = new CountDownLatch(2);
final List<Throwable> failures = new ArrayList<>();
// load customer information
final Observable<CustomerDTO> customerObservable = customerRxClient.loadCustomer(customerId);
customerObservable
.doOnError(throwable -> {
log.error("Failed to retrieve customer {} information for the reservation {}", customerId, reservationId, throwable);
cdl.countDown();
failures.add(throwable);
})
.doOnCompleted(cdl::countDown)
.subscribe(customer -> {
dto.getReservationOwner().setBirthday(customer.getBirthday());
dto.getReservationOwner().setCustomerId(customer.getCustomerId());
dto.getReservationOwner().setCitizenship(customer.getCitizenship());
dto.getReservationOwner().setEmail(customer.getEmail());
dto.getReservationOwner().setFirstName(customer.getFirstName());
dto.getReservationOwner().setGender(customer.getGender());
dto.getReservationOwner().setLastName(customer.getLastName());
dto.getReservationOwner().setPhone(ofNullable(customer.getPhone()).map(v -> mappingService.map(v, PhoneDTO.class)).orElse(null));
});
// load tickets
final Observable<List<TicketDTO>> ticketsObservable = ticketsClient.getTickets(reservationId);
ticketsObservable
.doOnError(throwable -> {
log.error("Failed to retrieve tickets for the reservation {}", reservationId, throwable);
cdl.countDown();
failures.add(throwable);
})
.doOnCompleted(cdl::countDown)
.subscribe(tickets -> dto.setTickets(tickets.stream()
.map(ticket -> ReservationDTO.TicketDTO.builder()
.guestSeqN(ticket.getGuestSeqN())
.qr(ticket.getQr())
.qrText(ticket.getQrText())
.usedAt(ticket.getUsedAt())
.build())
.collect(toList())));
try {
cdl.await();
} catch (InterruptedException _ignore) {
log.debug("Count down latch has been interrupted!", _ignore);
}
if (!failures.isEmpty()) {
throw new HystrixBadRequestException("Request to submodule has been failed");
}
return dto;
您对所需解决方案的想法是正确的,因为它使用了 zip
组合器。 Hystrix 命令未在该解决方案中执行的原因是生成的 Observable
没有订阅者。来自 documentation:
toObservable()
— returns a "cold" Observable that won’t subscribe to the underlying Observable until you subscribe to the resulting Observable
只需调用组合 Observable
:
上的 subscribe()
方法
Observable.zip(customerObservable, ticketsObservable, (customer, tickets) -> null)
.take(1)
.doOnError(throwable -> subRequestsFailed.set(true))
.subscribe();
在互联网上进行了一些挖掘后,我找到了解决方案。我的代码中错过的操作是 "subscribeOn":
ResponseDTO result = Observable.zip(
saleChannelPaxInventoryClient.getAvailabilities(saleChannel, routeLeg, date).subscribeOn(Schedulers.io()),
saleChannelCarDeckInventoryClient.getAvailabilities(saleChannel, routeLeg, date).subscribeOn(Schedulers.io()),
(paxAvailabilities, carDeckAvailabilities) -> {
ResponseDTO out = new ResponseDTO();
// process results of both observables here
return out;
}
)
.toBlocking()
.single();
我的应用程序中 Spring 服务的一个方法向其他两个微服务发出请求。我想使用 Hystrix 发出这些请求以使其具有容错能力,并且我想 运行 它们并行。
到目前为止,我为每次调用都实现了 HystrixObservableCommand,并使用 CountDownLatch 等待两个命令完成(或失败)。
当前的解决方案看起来非常冗长。是否可以使用 Observable 特性并行执行 Hystrix 命令?
所需的解决方案在伪代码中如下所示:
new LoadCustomerObservableCommand(customerClient, customerId).toObservable()
.doOnError(throwable -> log.error("Failed to retrieve customer {} information for the reservation {}", customerId, reservationId, throwable))
.doOnNext(customer -> myResponse.setCustomer(customer));
new GetTicketsObservableCommand(ticketsClient, reservationId).toObservable()
.doOnError(throwable -> log.error("Failed to retrieve tickets for the reservation {}", reservationId, throwable))
.doOnNext(tickets -> myResponse.setTickets(tickets));
final AtomicBoolean subRequestsFailed = new AtomicBoolean(false);
Observable.zip(customerObservable, ticketsObservable, (customer, tickets) -> null)
.doOnError(throwable -> subRequestsFailed.set(true))
.toBlocking()
.first();
if (subRequestsFailed.get()) {
throw new HystrixBadRequestException("One or more requests to submodules have been failed");
}
return dto;
不幸的是,这个所需的解决方案不起作用,因为 Hystrix 命令从未执行过。
我目前的解决方案是:
// execute requests to sub modules in parallel
final CountDownLatch cdl = new CountDownLatch(2);
final List<Throwable> failures = new ArrayList<>();
// load customer information
final Observable<CustomerDTO> customerObservable = customerRxClient.loadCustomer(customerId);
customerObservable
.doOnError(throwable -> {
log.error("Failed to retrieve customer {} information for the reservation {}", customerId, reservationId, throwable);
cdl.countDown();
failures.add(throwable);
})
.doOnCompleted(cdl::countDown)
.subscribe(customer -> {
dto.getReservationOwner().setBirthday(customer.getBirthday());
dto.getReservationOwner().setCustomerId(customer.getCustomerId());
dto.getReservationOwner().setCitizenship(customer.getCitizenship());
dto.getReservationOwner().setEmail(customer.getEmail());
dto.getReservationOwner().setFirstName(customer.getFirstName());
dto.getReservationOwner().setGender(customer.getGender());
dto.getReservationOwner().setLastName(customer.getLastName());
dto.getReservationOwner().setPhone(ofNullable(customer.getPhone()).map(v -> mappingService.map(v, PhoneDTO.class)).orElse(null));
});
// load tickets
final Observable<List<TicketDTO>> ticketsObservable = ticketsClient.getTickets(reservationId);
ticketsObservable
.doOnError(throwable -> {
log.error("Failed to retrieve tickets for the reservation {}", reservationId, throwable);
cdl.countDown();
failures.add(throwable);
})
.doOnCompleted(cdl::countDown)
.subscribe(tickets -> dto.setTickets(tickets.stream()
.map(ticket -> ReservationDTO.TicketDTO.builder()
.guestSeqN(ticket.getGuestSeqN())
.qr(ticket.getQr())
.qrText(ticket.getQrText())
.usedAt(ticket.getUsedAt())
.build())
.collect(toList())));
try {
cdl.await();
} catch (InterruptedException _ignore) {
log.debug("Count down latch has been interrupted!", _ignore);
}
if (!failures.isEmpty()) {
throw new HystrixBadRequestException("Request to submodule has been failed");
}
return dto;
您对所需解决方案的想法是正确的,因为它使用了 zip
组合器。 Hystrix 命令未在该解决方案中执行的原因是生成的 Observable
没有订阅者。来自 documentation:
toObservable()
— returns a "cold" Observable that won’t subscribe to the underlying Observable until you subscribe to the resulting Observable
只需调用组合 Observable
:
subscribe()
方法
Observable.zip(customerObservable, ticketsObservable, (customer, tickets) -> null)
.take(1)
.doOnError(throwable -> subRequestsFailed.set(true))
.subscribe();
在互联网上进行了一些挖掘后,我找到了解决方案。我的代码中错过的操作是 "subscribeOn":
ResponseDTO result = Observable.zip(
saleChannelPaxInventoryClient.getAvailabilities(saleChannel, routeLeg, date).subscribeOn(Schedulers.io()),
saleChannelCarDeckInventoryClient.getAvailabilities(saleChannel, routeLeg, date).subscribeOn(Schedulers.io()),
(paxAvailabilities, carDeckAvailabilities) -> {
ResponseDTO out = new ResponseDTO();
// process results of both observables here
return out;
}
)
.toBlocking()
.single();