Spring 云流合并来自两个不同函数的响应
Spring cloud stream merge responses from two different functions
我正在尝试使用 spring 云流来解决以下问题:
我有一个 class 调用两个独立的函数(函数 A 和 B),这两个函数必须并行工作,如果函数 A 完成它必须调用函数 C,如果函数B 完成,但这将调用函数 D,然后我需要等待函数 C 和函数 D 完成并将两个响应合并为一个响应,然后 return 这个合并的响应对象到起始 class那一定是在等待得到那个回应。
我遇到的问题是:
- 如何调用函数 C 来传递函数 A 响应?
- 如何等到功能 C 和功能 D 完成并在功能 E 中获得响应?
- 如何在控制器中等待功能E的响应,我正在使用
streamBridge.send
同时启动功能A和功能B。
我正在使用
spring-cloud-stream-3.1.3
spring-cloud-stream-binder-rabbit
必需
我不能使用 Kafka
需要
示例代码
服务类
@Service
@RequiredArgsConstructor
public class ServiceClass {
@NonNull
private final StreamBridge streamBridge;
@Override
protected MergedResponse execute(Input input) {
var send1 = streamBridge.send("functionA-in-0", input);
var send2 = streamBridge.send("functionB-in-0", input);
//TODO: Wait for Function E response object
}
}
函数 A
@Slf4j
@Configuration
public class FunctionAClass{
@Bean
public Function<Input, OutputFunctionA> functionA() {
return input -> {
//TODO: Invoke Function C to pass OutputFunctionA object
return OutputFunctionA.builder.build();
};
}
}
我不介意使用 Supplier
或 Consumer
而不是 Function
。
编辑
嗨,@Oleg Zhurakousky 感谢您的帮助,回答您的问题我的问题是:我必须创建一个 REST 端点,该端点使用 N
个不同的第三方 REST 端点(首先是两个,异步是必须的,因为它会顺序处理每个请求太慢)我不需要他们的所有数据,只需要几个字段来构建一个公共对象。我打算使用 streamBridge
来启动前两个函数,它们将为每个第三方构建所需的请求,然后是调用每个端点的函数,然后是用每个端点构建公共对象的函数响应,最后是一个收集公共对象并在我的服务响应中发送它们的函数。如果您还有其他问题,请告诉我。
此致。
两点。
- 由于您引入了一个同步点,因此无论您尝试什么,您最终都会有一个阻塞调用,因为同步(例如您的聚合要求)将不得不等待两个响应,然后进行关联等。最重要的是您将要处理一个状态问题,以及如何在聚合期间发生系统崩溃时恢复此类状态。
- 聚合并不是 s-c-stream 的真正用例,因此我们没有 framework-based 支持它。我会考虑使用 Spring 集成框架为两个以上的不同端点创建管道,然后使用 [聚合器模式](聚合器模式支持)进行聚合,然后使用 StreamBridge 将数据发送到目标目的地。
我正在尝试使用 spring 云流来解决以下问题:
我有一个 class 调用两个独立的函数(函数 A 和 B),这两个函数必须并行工作,如果函数 A 完成它必须调用函数 C,如果函数B 完成,但这将调用函数 D,然后我需要等待函数 C 和函数 D 完成并将两个响应合并为一个响应,然后 return 这个合并的响应对象到起始 class那一定是在等待得到那个回应。
我遇到的问题是:
- 如何调用函数 C 来传递函数 A 响应?
- 如何等到功能 C 和功能 D 完成并在功能 E 中获得响应?
- 如何在控制器中等待功能E的响应,我正在使用
streamBridge.send
同时启动功能A和功能B。
我正在使用
spring-cloud-stream-3.1.3
spring-cloud-stream-binder-rabbit
必需
我不能使用 Kafka
需要
示例代码
服务类
@Service
@RequiredArgsConstructor
public class ServiceClass {
@NonNull
private final StreamBridge streamBridge;
@Override
protected MergedResponse execute(Input input) {
var send1 = streamBridge.send("functionA-in-0", input);
var send2 = streamBridge.send("functionB-in-0", input);
//TODO: Wait for Function E response object
}
}
函数 A
@Slf4j
@Configuration
public class FunctionAClass{
@Bean
public Function<Input, OutputFunctionA> functionA() {
return input -> {
//TODO: Invoke Function C to pass OutputFunctionA object
return OutputFunctionA.builder.build();
};
}
}
我不介意使用 Supplier
或 Consumer
而不是 Function
。
编辑
嗨,@Oleg Zhurakousky 感谢您的帮助,回答您的问题我的问题是:我必须创建一个 REST 端点,该端点使用 N
个不同的第三方 REST 端点(首先是两个,异步是必须的,因为它会顺序处理每个请求太慢)我不需要他们的所有数据,只需要几个字段来构建一个公共对象。我打算使用 streamBridge
来启动前两个函数,它们将为每个第三方构建所需的请求,然后是调用每个端点的函数,然后是用每个端点构建公共对象的函数响应,最后是一个收集公共对象并在我的服务响应中发送它们的函数。如果您还有其他问题,请告诉我。
此致。
两点。
- 由于您引入了一个同步点,因此无论您尝试什么,您最终都会有一个阻塞调用,因为同步(例如您的聚合要求)将不得不等待两个响应,然后进行关联等。最重要的是您将要处理一个状态问题,以及如何在聚合期间发生系统崩溃时恢复此类状态。
- 聚合并不是 s-c-stream 的真正用例,因此我们没有 framework-based 支持它。我会考虑使用 Spring 集成框架为两个以上的不同端点创建管道,然后使用 [聚合器模式](聚合器模式支持)进行聚合,然后使用 StreamBridge 将数据发送到目标目的地。