响应式编程:Spring WebFlux:如何构建微服务调用链?
Reactive Programming: Spring WebFlux: How to build a chain of micro-service calls?
Spring Boot
申请:
a @RestController
收到以下负载:
{
"cartoon": "The Little Mermaid",
"characterNames": ["Ariel", "Prince Eric", "Sebastian", "Flounder"]
}
我需要进行如下处理:
- 获取每个角色名称的唯一 ID:对 "cartoon-characters" 微服务进行 HTTP 调用,returns ids by names
转换控制器接收到的数据:
用上一步从 "cartoon-characters" 微服务接收到的适当 ID 替换字符名称。
{
"cartoon": "The Little Mermaid",
"characterIds": [1, 2, 3, 4]
}
向 "cartoon-db" 微服务发送一个 HTTP POST 请求并转换数据。
- 将来自 "cartoon-db" 的响应映射到作为控制器 return 值的内部表示。
我遇到的问题:
我需要使用 Reactive Programming
(non-blocking\async 处理)和 Spring WebFlux
(Mono
|Flux
)和 Spring Reactive WebClient
- 但我对该堆栈的经验为零,尽可能多地阅读它,再加上谷歌搜索,但仍然有很多未解决的问题,例如:
Q1。我已经配置了响应式 webClient,它向 "cartoon-characters" 微服务发送请求:
public Mono<Integer> getCartoonCharacterIdbyName(String characterName) {
return WebClient.builder().baseUrl("http://cartoon-characters").build()
.get()
.uri("/character/{characterName}", characterName)
.retrieve()
.bodyToMono(Integer.class);
}
如您所见,我有一个卡通人物名称列表,我需要为每个人物调用 getCartoonCharacterIdbyName(String name)
方法,我不确定按系列调用它的正确选项,相信正确选项:并行执行。
写了如下方法:
public List<Integer> getCartoonCharacterIds(List<String> names) {
Flux<Integer> flux = Flux.fromStream(names.stream())
.flatMap(this::getCartoonCharacterIdbyName);
return StreamSupport.stream(flux.toIterable().spliterator(), false)
.collect(Collectors.toList());
}
但我有疑问,这段代码确实并行 WebClient
执行,而且代码调用 flux.toIterable()
阻塞了线程,所以在这个实现中我失去了非阻塞机制。
我的假设是否正确?
我需要如何将其重写为具有并行性和非阻塞性?
Q2.
技术上是否有可能以反应式方式转换控制器接收到的输入数据(我的意思是用 ID 替换名称):当我们使用 Flux<Integer>
characterIds 而不是 List<Integer>
of characterIds 时?
Q3. 是否有可能在 第 2 步 之后不仅获得转换后的 Data 对象,而且获得可以使用的 Mono<>由另一个 WebClient 在 步骤 3?
实际上,这是一个很好的问题,因为了解 WebFlux 或项目反应器框架,在链接微服务时需要几个步骤。
首先是认识到 WebClient
应该接受发布者,return 应该接受发布者。将此推断为 4 种不同的方法签名以帮助思考。
- 单声道 -> 单声道
- 通量 -> 通量
- 单声道 -> 通量
- 通量 -> 单声道
当然,在所有情况下,它只是 Publisher->Publisher,但在您更好地理解之前先保留它。前两个很明显,你只是用.map(...)
来处理流中的对象,但你需要学习如何处理后两个。如上所述,从 Flux->Mono 可以用 .collectList()
或 .reduce(...)
完成。从 Mono->Flux 似乎通常用 .flatMapMany
或 .flatMapIterable
或它的一些变体来完成。可能还有其他技术。您永远不应在任何 WebFlux 代码中使用 .block()
,如果您尝试这样做,通常会出现运行时错误。
在你的例子中你想去
- (单声道->通量)->(通量->通量)->(通量->通量)
如你所说,你想要
- 单声道->通量->通量
第二部分是了解链接流。你可以做
- p3(p2(p1(对象)));
这会链接 p1->p2->p3,但我总是发现改为 "Service Layer" 更容易理解。
- o2 = p1(对象);
- o3 = p2(o2);
- 结果 = p3(o3);
这段代码更易于阅读和维护,并且随着时间的推移,您会逐渐理解该语句的价值。
我在你的例子中遇到的唯一问题是 Flux<String>
和 WebClient
作为 @RequestBody
。不起作用。参见 WebClient bodyToFlux(String.class) for string list doesn't separate individual values。除此之外,它是一个非常简单的应用程序。当你调试它时,你会发现它在到达 Flux<Integer> ids = mapNamesToIds(fn)
行之前到达 .subscribe(System.out::println)
行。这是因为流程是在执行之前设置的。需要一段时间才能理解这一点,但这是项目反应堆框架的重点。
@SpringBootApplication
@RestController
@RequestMapping("/demo")
public class DemoApplication implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
Map<Integer, CartoonCharacter> characters;
@Override
public void run(ApplicationArguments args) throws Exception {
String[] names = new String[] {"Ariel", "Prince Eric", "Sebastian", "Flounder"};
characters = Arrays.asList( new CartoonCharacter[] {
new CartoonCharacter(names[0].hashCode(), names[0], "Mermaid"),
new CartoonCharacter(names[1].hashCode(), names[1], "Human"),
new CartoonCharacter(names[2].hashCode(), names[2], "Crustacean"),
new CartoonCharacter(names[3].hashCode(), names[3], "Fish")}
)
.stream().collect(Collectors.toMap(CartoonCharacter::getId, Function.identity()));
// TODO Auto-generated method stub
CartoonRequest cr = CartoonRequest.builder()
.cartoon("The Little Mermaid")
.characterNames(Arrays.asList(names))
.build();
thisLocalClient
.post()
.uri("cartoonDetails")
.body(Mono.just(cr), CartoonRequest.class)
.retrieve()
.bodyToFlux(CartoonCharacter.class)
.subscribe(System.out::println);
}
@Bean
WebClient localClient() {
return WebClient.create("http://localhost:8080/demo/");
}
@Autowired
WebClient thisLocalClient;
@PostMapping("cartoonDetails")
Flux<CartoonCharacter> getDetails(@RequestBody Mono<CartoonRequest> cartoonRequest) {
Flux<StringWrapper> fn = cartoonRequest.flatMapIterable(cr->cr.getCharacterNames().stream().map(StringWrapper::new).collect(Collectors.toList()));
Flux<Integer> ids = mapNamesToIds(fn);
Flux<CartoonCharacter> details = mapIdsToDetails(ids);
return details;
}
// Service Layer Methods
private Flux<Integer> mapNamesToIds(Flux<StringWrapper> names) {
return thisLocalClient
.post()
.uri("findIds")
.body(names, StringWrapper.class)
.retrieve()
.bodyToFlux(Integer.class);
}
private Flux<CartoonCharacter> mapIdsToDetails(Flux<Integer> ids) {
return thisLocalClient
.post()
.uri("findDetails")
.body(ids, Integer.class)
.retrieve()
.bodyToFlux(CartoonCharacter.class);
}
// Services
@PostMapping("findIds")
Flux<Integer> getIds(@RequestBody Flux<StringWrapper> names) {
return names.map(name->name.getString().hashCode());
}
@PostMapping("findDetails")
Flux<CartoonCharacter> getDetails(@RequestBody Flux<Integer> ids) {
return ids.map(characters::get);
}
}
还有:
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class StringWrapper {
private String string;
}
@Data
@Builder
public class CartoonRequest {
private String cartoon;
private List<String> characterNames;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CartoonCharacter {
Integer id;
String name;
String species;
}
Spring Boot
申请:
a @RestController
收到以下负载:
{
"cartoon": "The Little Mermaid",
"characterNames": ["Ariel", "Prince Eric", "Sebastian", "Flounder"]
}
我需要进行如下处理:
- 获取每个角色名称的唯一 ID:对 "cartoon-characters" 微服务进行 HTTP 调用,returns ids by names
转换控制器接收到的数据: 用上一步从 "cartoon-characters" 微服务接收到的适当 ID 替换字符名称。
{ "cartoon": "The Little Mermaid", "characterIds": [1, 2, 3, 4] }
向 "cartoon-db" 微服务发送一个 HTTP POST 请求并转换数据。
- 将来自 "cartoon-db" 的响应映射到作为控制器 return 值的内部表示。
我遇到的问题:
我需要使用 Reactive Programming
(non-blocking\async 处理)和 Spring WebFlux
(Mono
|Flux
)和 Spring Reactive WebClient
- 但我对该堆栈的经验为零,尽可能多地阅读它,再加上谷歌搜索,但仍然有很多未解决的问题,例如:
Q1。我已经配置了响应式 webClient,它向 "cartoon-characters" 微服务发送请求:
public Mono<Integer> getCartoonCharacterIdbyName(String characterName) {
return WebClient.builder().baseUrl("http://cartoon-characters").build()
.get()
.uri("/character/{characterName}", characterName)
.retrieve()
.bodyToMono(Integer.class);
}
如您所见,我有一个卡通人物名称列表,我需要为每个人物调用 getCartoonCharacterIdbyName(String name)
方法,我不确定按系列调用它的正确选项,相信正确选项:并行执行。
写了如下方法:
public List<Integer> getCartoonCharacterIds(List<String> names) {
Flux<Integer> flux = Flux.fromStream(names.stream())
.flatMap(this::getCartoonCharacterIdbyName);
return StreamSupport.stream(flux.toIterable().spliterator(), false)
.collect(Collectors.toList());
}
但我有疑问,这段代码确实并行 WebClient
执行,而且代码调用 flux.toIterable()
阻塞了线程,所以在这个实现中我失去了非阻塞机制。
我的假设是否正确?
我需要如何将其重写为具有并行性和非阻塞性?
Q2.
技术上是否有可能以反应式方式转换控制器接收到的输入数据(我的意思是用 ID 替换名称):当我们使用 Flux<Integer>
characterIds 而不是 List<Integer>
of characterIds 时?
Q3. 是否有可能在 第 2 步 之后不仅获得转换后的 Data 对象,而且获得可以使用的 Mono<>由另一个 WebClient 在 步骤 3?
实际上,这是一个很好的问题,因为了解 WebFlux 或项目反应器框架,在链接微服务时需要几个步骤。
首先是认识到 WebClient
应该接受发布者,return 应该接受发布者。将此推断为 4 种不同的方法签名以帮助思考。
- 单声道 -> 单声道
- 通量 -> 通量
- 单声道 -> 通量
- 通量 -> 单声道
当然,在所有情况下,它只是 Publisher->Publisher,但在您更好地理解之前先保留它。前两个很明显,你只是用.map(...)
来处理流中的对象,但你需要学习如何处理后两个。如上所述,从 Flux->Mono 可以用 .collectList()
或 .reduce(...)
完成。从 Mono->Flux 似乎通常用 .flatMapMany
或 .flatMapIterable
或它的一些变体来完成。可能还有其他技术。您永远不应在任何 WebFlux 代码中使用 .block()
,如果您尝试这样做,通常会出现运行时错误。
在你的例子中你想去
- (单声道->通量)->(通量->通量)->(通量->通量)
如你所说,你想要
- 单声道->通量->通量
第二部分是了解链接流。你可以做
- p3(p2(p1(对象)));
这会链接 p1->p2->p3,但我总是发现改为 "Service Layer" 更容易理解。
- o2 = p1(对象);
- o3 = p2(o2);
- 结果 = p3(o3);
这段代码更易于阅读和维护,并且随着时间的推移,您会逐渐理解该语句的价值。
我在你的例子中遇到的唯一问题是 Flux<String>
和 WebClient
作为 @RequestBody
。不起作用。参见 WebClient bodyToFlux(String.class) for string list doesn't separate individual values。除此之外,它是一个非常简单的应用程序。当你调试它时,你会发现它在到达 Flux<Integer> ids = mapNamesToIds(fn)
行之前到达 .subscribe(System.out::println)
行。这是因为流程是在执行之前设置的。需要一段时间才能理解这一点,但这是项目反应堆框架的重点。
@SpringBootApplication
@RestController
@RequestMapping("/demo")
public class DemoApplication implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
Map<Integer, CartoonCharacter> characters;
@Override
public void run(ApplicationArguments args) throws Exception {
String[] names = new String[] {"Ariel", "Prince Eric", "Sebastian", "Flounder"};
characters = Arrays.asList( new CartoonCharacter[] {
new CartoonCharacter(names[0].hashCode(), names[0], "Mermaid"),
new CartoonCharacter(names[1].hashCode(), names[1], "Human"),
new CartoonCharacter(names[2].hashCode(), names[2], "Crustacean"),
new CartoonCharacter(names[3].hashCode(), names[3], "Fish")}
)
.stream().collect(Collectors.toMap(CartoonCharacter::getId, Function.identity()));
// TODO Auto-generated method stub
CartoonRequest cr = CartoonRequest.builder()
.cartoon("The Little Mermaid")
.characterNames(Arrays.asList(names))
.build();
thisLocalClient
.post()
.uri("cartoonDetails")
.body(Mono.just(cr), CartoonRequest.class)
.retrieve()
.bodyToFlux(CartoonCharacter.class)
.subscribe(System.out::println);
}
@Bean
WebClient localClient() {
return WebClient.create("http://localhost:8080/demo/");
}
@Autowired
WebClient thisLocalClient;
@PostMapping("cartoonDetails")
Flux<CartoonCharacter> getDetails(@RequestBody Mono<CartoonRequest> cartoonRequest) {
Flux<StringWrapper> fn = cartoonRequest.flatMapIterable(cr->cr.getCharacterNames().stream().map(StringWrapper::new).collect(Collectors.toList()));
Flux<Integer> ids = mapNamesToIds(fn);
Flux<CartoonCharacter> details = mapIdsToDetails(ids);
return details;
}
// Service Layer Methods
private Flux<Integer> mapNamesToIds(Flux<StringWrapper> names) {
return thisLocalClient
.post()
.uri("findIds")
.body(names, StringWrapper.class)
.retrieve()
.bodyToFlux(Integer.class);
}
private Flux<CartoonCharacter> mapIdsToDetails(Flux<Integer> ids) {
return thisLocalClient
.post()
.uri("findDetails")
.body(ids, Integer.class)
.retrieve()
.bodyToFlux(CartoonCharacter.class);
}
// Services
@PostMapping("findIds")
Flux<Integer> getIds(@RequestBody Flux<StringWrapper> names) {
return names.map(name->name.getString().hashCode());
}
@PostMapping("findDetails")
Flux<CartoonCharacter> getDetails(@RequestBody Flux<Integer> ids) {
return ids.map(characters::get);
}
}
还有:
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class StringWrapper {
private String string;
}
@Data
@Builder
public class CartoonRequest {
private String cartoon;
private List<String> characterNames;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CartoonCharacter {
Integer id;
String name;
String species;
}