整合来自不同微服务的 Flux 结果的正确方法是什么?
What is the right way to consolidate Flux results from different micro-services?
我有一个边缘服务,它整合了来自 3 个不同微服务的结果。
- Returns 大量客户
- Returns 客户配置文件的单声道
- Returns 客户的角色变化
构建 Flux of CustomerProfile 对象的正确方法是什么,该对象将包含有关客户、配置文件和角色的信息?
@RequiredArgsConstructor
@RestController
class CrmRestController {
private final CrmClient crmClient;
@GetMapping("/customer-profiles")
public Flux<CustomerProfile> getCustomerProfiles() {
return crmClient.getCustomerProfiles();
}
}
@RequiredArgsConstructor
@Component
class CrmClient {
private final WebClient http;
private final RSocketRequester rsocket;
Flux<Customer> getCustomers() {
return this.http.get().uri("http://localhost:8080/customers").retrieve().bodyToFlux(Customer.class);
}
Flux<Role> getRolesFor(Integer customerId) {
return this.http.get().uri("http://localhost:8183/roles").retrieve().bodyToFlux(Role.class);
}
Mono<Profile> getProfileFor(Integer customerId) {
return this.rsocket.route("profiles.{customerId}", customerId).retrieveMono(Profile.class);
}
Flux<CustomerProfile> getCustomerProfiles() {
//TODO: What is the right way to create CustomerProfile objects and fill in values?
//my best guess does not compile
return getCustomers()
.flatMap(customer -> getProfileFor(customer.getId())
.map(profile -> getRolesFor(customer.getId())
.collectList().map(roles -> new CustomerProfile(customer, profile, roles))));
}
}
@Data
@AllArgsConstructor
class Customer {
private Integer id;
private String name;
}
@Data
@AllArgsConstructor
class Profile {
Integer id;
Date registered;
}
@Data
@AllArgsConstructor
class Role {
private Integer id;
private String title;
}
@Data
@AllArgsConstructor
class CustomerProfile {
private Customer customer;
private Profile profile;
private List<Role> roles;
}
所以,就像我上面说的,你的语法错误很容易修复,但你的问题需要反应性实践的解释。
记住,反应式是关于 I/O 操作期间的“非阻塞”。每次调用您的依赖服务之一时,您都会“阻塞”。对 getRolesFor
和 getProfileFor
的调用在进行数据库查找时被阻塞。因此,他们 return 反应性响应为 Mono
或 Flux
。这并不是说它们不阻塞,它们确实阻塞,但是 WebFlux/Reactive 框架允许服务器在同一线程上处理其他请求。在传统模型中,服务器必须为其他请求创建新线程,而当前请求线程因等待相关服务而被阻塞。
在您的请求中,您可以将 customerId 传递给 getRolesFor
和 getProfileFor
,这样您就可以同时调用这两个服务并合并结果。
|--getProfilesFor()->|
getCustomers() -->|------------------->|-->return new CustomerProfile()
|--getRolesFor()---->|
恕我直言,这是最“有效”的。
这个代码是:
@RequiredArgsConstructor
@Component
class CrmClient {
private final WebClient http;
Flux<Customer> getCustomers() {
return this.http.get().uri("http://localhost:8080/customers").retrieve().bodyToFlux(Customer.class);
}
Mono<List<Role>> getRolesFor(Integer customerId) {
ParameterizedTypeReference<List<Role>> rolesType = new ParameterizedTypeReference() {};
return this.http.get().uri("http://localhost:8183/roles").retrieve().bodyToMono(rolesType);
}
Mono<Profile> getProfileFor(Integer customerId) {
return this.http.get().uri("http://localhost:8183/roles").retrieve().bodyToMono(Profile.class);
}
Flux<CustomerProfile> getCustomerProfiles() {
return getCustomers()
.flatMap(c -> Mono.zip(getRolesFor(c.getId()), getProfileFor(c.getId()))
.map(t -> new CustomerProfile(c, t.getT2(), t.getT1())));
}
}
但请注意,我已经更改了 getRolesFor
签名。这是因为 Flux
签名不正确。您没有从数据库中获取 Flux
,而是 Mono<List<Role>>
.
数据库不会 return 基于 getRoles()
的一些内部不确定事件的结果流,因为单个客户只有少量 Roles
。相反,数据库服务器将创建一个多行结果并 return 一次全部创建。
你的投诉可以被听到。但是 SpringDataJpa getRolesByCustomer
(或任何它是什么)return 是 Flux
。这是默认实现,但不适用于此用例。您可以通过制作一个自定义方法来解决这个问题,returns 是 Mono<List<Role>>
哪个更好。
因此,假设您将 getProfilesFor()
更改为正确的签名,上面的结果代码将
- 获取客户
- 对于每个客户,它将
- 同时将 customerId 传递给
getProfilesFor
和 getRolesFor
并且
- 当两个结果都被 returned 时,它将把它们传递给地图函数,
- 将实例化并 return 一个
CustomerProfile
对象。
等等,还有更多。 Reactive 框架首先构建流,然后执行它。流的创建总是阻塞的,因此框架可以初始化。在这种情况下,getCustomers()
创建了流程,因此在 getCustomers()
完成之前构建过程不会开始,这将是阻塞的。
要解决此问题,您需要
return Flux.from((getCustomers())...
这种流程是从 Flux.from()
发布者创建的,并且 getCustomers()
在构建过程完成并订阅流程之前不会执行。
最后,请注意所有相同的讨论也适用于 getCustomers
调用,但由于获得 Mono<List<Customer>>
可能不会获得显着的性能优势,而不是 Flux<Customer>
不值得为此担心。
不过,如果你想知道:
Mono<List<Customer>> getCustomers() {
return this.http.get().uri("http://localhost:8080/customers").retrieve().bodyToMono(new ParameterizedTypeReference<>() {});
}
Mono<List<CustomerProfile>> getCustomerProfiles() {
return Mono.from(getCustomers())
.flatMap(cs -> Flux.fromIterable(cs).flatMap(c -> Mono.zip(getRolesFor(c.getId()), getProfileFor(c.getId()))
.map(t -> new CustomerProfile(c, t.getT2(), t.getT1())))
.collect(Collectors.toList()));
}
我认为这是最有效的,但是,我也认为 Flux.fromIterable(cs)
将对 getRolesFor
和 getProfileFor
进行多次异步调用,如果有很多顾客。真的,如果您获得所有客户,那么您还可以获得所有角色和所有配置文件,并在客户端中并行构建过程。然而,此时,您正在执行数据库服务器优化的操作,因此您最好创建一个查询以将整个数据库连接到结果集并直接从中创建 CustomerProfiles
。
将 Customer
、Role
和 Profile
实体分解为单独的服务应被视为分布式系统反模式。而是围绕业务流程构建服务。
我有一个边缘服务,它整合了来自 3 个不同微服务的结果。
- Returns 大量客户
- Returns 客户配置文件的单声道
- Returns 客户的角色变化
构建 Flux of CustomerProfile 对象的正确方法是什么,该对象将包含有关客户、配置文件和角色的信息?
@RequiredArgsConstructor
@RestController
class CrmRestController {
private final CrmClient crmClient;
@GetMapping("/customer-profiles")
public Flux<CustomerProfile> getCustomerProfiles() {
return crmClient.getCustomerProfiles();
}
}
@RequiredArgsConstructor
@Component
class CrmClient {
private final WebClient http;
private final RSocketRequester rsocket;
Flux<Customer> getCustomers() {
return this.http.get().uri("http://localhost:8080/customers").retrieve().bodyToFlux(Customer.class);
}
Flux<Role> getRolesFor(Integer customerId) {
return this.http.get().uri("http://localhost:8183/roles").retrieve().bodyToFlux(Role.class);
}
Mono<Profile> getProfileFor(Integer customerId) {
return this.rsocket.route("profiles.{customerId}", customerId).retrieveMono(Profile.class);
}
Flux<CustomerProfile> getCustomerProfiles() {
//TODO: What is the right way to create CustomerProfile objects and fill in values?
//my best guess does not compile
return getCustomers()
.flatMap(customer -> getProfileFor(customer.getId())
.map(profile -> getRolesFor(customer.getId())
.collectList().map(roles -> new CustomerProfile(customer, profile, roles))));
}
}
@Data
@AllArgsConstructor
class Customer {
private Integer id;
private String name;
}
@Data
@AllArgsConstructor
class Profile {
Integer id;
Date registered;
}
@Data
@AllArgsConstructor
class Role {
private Integer id;
private String title;
}
@Data
@AllArgsConstructor
class CustomerProfile {
private Customer customer;
private Profile profile;
private List<Role> roles;
}
所以,就像我上面说的,你的语法错误很容易修复,但你的问题需要反应性实践的解释。
记住,反应式是关于 I/O 操作期间的“非阻塞”。每次调用您的依赖服务之一时,您都会“阻塞”。对 getRolesFor
和 getProfileFor
的调用在进行数据库查找时被阻塞。因此,他们 return 反应性响应为 Mono
或 Flux
。这并不是说它们不阻塞,它们确实阻塞,但是 WebFlux/Reactive 框架允许服务器在同一线程上处理其他请求。在传统模型中,服务器必须为其他请求创建新线程,而当前请求线程因等待相关服务而被阻塞。
在您的请求中,您可以将 customerId 传递给 getRolesFor
和 getProfileFor
,这样您就可以同时调用这两个服务并合并结果。
|--getProfilesFor()->|
getCustomers() -->|------------------->|-->return new CustomerProfile()
|--getRolesFor()---->|
恕我直言,这是最“有效”的。
这个代码是:
@RequiredArgsConstructor
@Component
class CrmClient {
private final WebClient http;
Flux<Customer> getCustomers() {
return this.http.get().uri("http://localhost:8080/customers").retrieve().bodyToFlux(Customer.class);
}
Mono<List<Role>> getRolesFor(Integer customerId) {
ParameterizedTypeReference<List<Role>> rolesType = new ParameterizedTypeReference() {};
return this.http.get().uri("http://localhost:8183/roles").retrieve().bodyToMono(rolesType);
}
Mono<Profile> getProfileFor(Integer customerId) {
return this.http.get().uri("http://localhost:8183/roles").retrieve().bodyToMono(Profile.class);
}
Flux<CustomerProfile> getCustomerProfiles() {
return getCustomers()
.flatMap(c -> Mono.zip(getRolesFor(c.getId()), getProfileFor(c.getId()))
.map(t -> new CustomerProfile(c, t.getT2(), t.getT1())));
}
}
但请注意,我已经更改了 getRolesFor
签名。这是因为 Flux
签名不正确。您没有从数据库中获取 Flux
,而是 Mono<List<Role>>
.
数据库不会 return 基于 getRoles()
的一些内部不确定事件的结果流,因为单个客户只有少量 Roles
。相反,数据库服务器将创建一个多行结果并 return 一次全部创建。
你的投诉可以被听到。但是 SpringDataJpa getRolesByCustomer
(或任何它是什么)return 是 Flux
。这是默认实现,但不适用于此用例。您可以通过制作一个自定义方法来解决这个问题,returns 是 Mono<List<Role>>
哪个更好。
因此,假设您将 getProfilesFor()
更改为正确的签名,上面的结果代码将
- 获取客户
- 对于每个客户,它将
- 同时将 customerId 传递给
getProfilesFor
和getRolesFor
并且 - 当两个结果都被 returned 时,它将把它们传递给地图函数,
- 将实例化并 return 一个
CustomerProfile
对象。
- 同时将 customerId 传递给
等等,还有更多。 Reactive 框架首先构建流,然后执行它。流的创建总是阻塞的,因此框架可以初始化。在这种情况下,getCustomers()
创建了流程,因此在 getCustomers()
完成之前构建过程不会开始,这将是阻塞的。
要解决此问题,您需要
return Flux.from((getCustomers())...
这种流程是从 Flux.from()
发布者创建的,并且 getCustomers()
在构建过程完成并订阅流程之前不会执行。
最后,请注意所有相同的讨论也适用于 getCustomers
调用,但由于获得 Mono<List<Customer>>
可能不会获得显着的性能优势,而不是 Flux<Customer>
不值得为此担心。
不过,如果你想知道:
Mono<List<Customer>> getCustomers() {
return this.http.get().uri("http://localhost:8080/customers").retrieve().bodyToMono(new ParameterizedTypeReference<>() {});
}
Mono<List<CustomerProfile>> getCustomerProfiles() {
return Mono.from(getCustomers())
.flatMap(cs -> Flux.fromIterable(cs).flatMap(c -> Mono.zip(getRolesFor(c.getId()), getProfileFor(c.getId()))
.map(t -> new CustomerProfile(c, t.getT2(), t.getT1())))
.collect(Collectors.toList()));
}
我认为这是最有效的,但是,我也认为 Flux.fromIterable(cs)
将对 getRolesFor
和 getProfileFor
进行多次异步调用,如果有很多顾客。真的,如果您获得所有客户,那么您还可以获得所有角色和所有配置文件,并在客户端中并行构建过程。然而,此时,您正在执行数据库服务器优化的操作,因此您最好创建一个查询以将整个数据库连接到结果集并直接从中创建 CustomerProfiles
。
将 Customer
、Role
和 Profile
实体分解为单独的服务应被视为分布式系统反模式。而是围绕业务流程构建服务。