Reactor spring mongodb 存储库将多个结果组合在一起

Reactor spring mongodb repository combine multiple results together

我是反应式编程的新手,目前正在开发基于 spring webflux 的应用程序。我被困在几个问题之间。

public class FooServiceImpl {

@Autowired
private FooDao fooDao;

@Autowired
private AService aService;

@Autowired
private BService bService;

public long calculateSomething(long fooId) {
    Foo foo = fooDao.findById(fooId); // Blocking call one

    if (foo == null) {
        foo = new Foo();
    }

    Long bCount = bService.getCountBByFooId(fooId); // Blocking call two
    AEntity aEntity = aService.getAByFooId(fooId);  // Blocking call three

    // Do some calculation using foo, bCount and aEntity
    // ...
    // ...

    return someResult;
}
}

这是我们编写阻塞代码的方式,它使用三个外部 API 调用结果(我们将其视为数据库调用)。我正在努力将其转换为反应式代码,如果所有三个都变成单声道,并且如果我订阅所有三个,外部订阅者会被阻止吗?

public Mono<Long> calculateSomething(long fooId) {
    return Mono.create(sink -> {
        Mono<Foo> monoFoo = fooDao.findById(fooId); // Reactive call one
        monoFoo.subscribe(foo -> {
            if (foo == null) {
                foo = new Foo();
            }

            Mono<Long> monoCount = bService.getCountBByFooId(fooId);  // Reactive call two

            monoCount.subscribe(aLong -> {
                Mono<AEntity> monoA = aService.getAByFooId(fooId);  // Reactive call three
                monoA.subscribe(aEntity -> {
                    //...
                    //...
                    sink.success(someResult);
                });
            });
        });
    };
  }

我看到有一个叫做 zip 的函数,但它只适用于两个结果,那么有没有办法在这里应用它?

另外,如果我们在 create 方法中订阅某些东西会发生什么,它会阻塞线程吗?

如果你能帮助我,将不胜感激。

如果你给了我你想要用这些值做的计算,我会更容易展示反应堆的计算方式。但是,假设您想从数据库中读取一个值,然后将该值用于另一件事。使用平面图并制作独特的 Flux 减少代码行和复杂性,无需像其他人所说的那样使用 subscribe() 。示例:

return fooDao.findById(fooId)
.flatmap(foo -> bService.getCountBByFooId(foo))
.flatmap(bCount -> aService.getAByFooId(fooId).getCount()+bCount);