Spring 云流合并来自两个不同函数的响应

Spring cloud stream merge responses from two different functions

我正在尝试使用 spring 云流来解决以下问题:

我有一个 class 调用两个独立的函数(函数 A 和 B),这两个函数必须并行工作,如果函数 A 完成它必须调用函数 C,如果函数B 完成,但这将调用函数 D,然后我需要等待函数 C 和函数 D 完成并将两个响应合并为一个响应,然后 return 这个合并的响应对象到起始 class那一定是在等待得到那个回应。

我遇到的问题是:

我正在使用

我不能使用 Kafka 需要

示例代码

服务类

@Service
@RequiredArgsConstructor
public class ServiceClass {

    @NonNull
    private final StreamBridge streamBridge;

    @Override
    protected MergedResponse execute(Input input) {
        var send1 = streamBridge.send("functionA-in-0", input);
        var send2 = streamBridge.send("functionB-in-0", input);

        //TODO: Wait for Function E response object
    }
}

函数 A

@Slf4j
@Configuration
public class FunctionAClass{

    @Bean
    public Function<Input, OutputFunctionA> functionA() {
        return input -> {
            //TODO: Invoke Function C to pass OutputFunctionA object
            return OutputFunctionA.builder.build();
        };
    }
}

我不介意使用 SupplierConsumer 而不是 Function

编辑 嗨,@Oleg Zhurakousky 感谢您的帮助,回答您的问题我的问题是:我必须创建一个 REST 端点,该端点使用 N 个不同的第三方 REST 端点(首先是两个,异步是必须的,因为它会顺序处理每个请求太慢)我不需要他们的所有数据,只需要几个字段来构建一个公共对象。我打算使用 streamBridge 来启动前两个函数,它们将为每个第三方构建所需的请求,然后是调用每个端点的函数,然后是用每个端点构建公共对象的函数响应,最后是一个收集公共对象并在我的服务响应中发送它们的函数。如果您还有其他问题,请告诉我。

此致。

两点。

  1. 由于您引入了一个同步点,因此无论您尝试什么,您最终都会有一个阻塞调用,因为同步(例如您的聚合要求)将不得不等待两个响应,然后进行关联等。最重要的是您将要处理一个状态问题,以及如何在聚合期间发生系统崩溃时恢复此类状态。
  2. 聚合并不是 s-c-stream 的真正用例,因此我们没有 framework-based 支持它。我会考虑使用 Spring 集成框架为两个以上的不同端点创建管道,然后使用 [聚合器模式](聚合器模式支持)进行聚合,然后使用 StreamBridge 将数据发送到目标目的地。