未执行反应式 Cassandra 回调 doInSession

Reactive Cassandra Callback doInSession is not executed

我按照一个例子稍作修改:https://dzone.com/articles/spring-webflux-first-steps

我的 ServiceImpl 看起来像:

private final HotelRepository hotelRepository;

private final HotelByLetterRepository hotelByLetterRepository;

public HotelServiceImpl(HotelRepository hotelRepository, HotelByLetterRepository hotelByLetterRepository) {
    this.hotelRepository = hotelRepository;
    this.hotelByLetterRepository = hotelByLetterRepository;
}

@Override
public Mono<Hotel> save(Hotel hotel) {
    if (hotel.getId() == null) {
        hotel.setId(UUID.randomUUID());
    }
    Mono<Hotel> saved = hotelRepository.save(hotel);
    saved.then(hotelByLetterRepository.save(new HotelByLetter(hotel)));
    return saved;
}

Hotel 实体保存逻辑后尝试保存 HottelByLetter

在存储库中,我注入 ReactiveCassandraOperations,对于保存方法,我只调用插入方法。

@Repository
public class CassandraHotelRepository implements HotelRepository {

    private final ReactiveCassandraOperations cassandraTemplate;

    public CassandraHotelRepository(ReactiveCassandraOperations cassandraTemplate) {
        this.cassandraTemplate = cassandraTemplate;
    }

    @Override
    public Mono<Hotel> save(Hotel hotel) {
        return cassandraTemplate.insert(hotel);
    }
}

服务调用后只保存Hotel,不保存HotelByLetter。 调试后我发现:

ReactiveCqlTemplate 中,方法 createFlux 被正确调用了两次 ReactiveSessionCallback

protected <T> Flux<T> createFlux(ReactiveSessionCallback<T> callback) {

    Assert.notNull(callback, "ReactiveStatementCallback must not be null");

    ReactiveSession session = getSession();

    return Flux.defer(() -> callback.doInSession(session));
}

但是,callback.doInSession(session)插入新酒店只执行一次。 我也尝试扩展 ReactiveCrudRepository,但同样的问题。

我正在使用:org.springframework.data/spring-data-cassandra/2.0.0.RELEASE

TL;DR;

您需要使用每个发布者运算符的结果来应用创建 Publisher 的实际操作。

说明

Project Reactor 的基本概念是永远不会通过运算符改变 Publisher,而是返回一个新实例。这与 CompletableFuture 之类的 Future 不同,您可以在其中注册回调,并且您没有义务重用回调注册方法的结果来使其工作。

您的 HotelServiceImpl 代码应如下所示:

class HotelServiceImpl implements HotelService {

    // …

    @Override
    public Mono<Hotel> save(Hotel hotel) {
        if (hotel.getId() == null) {
            hotel.setId(UUID.randomUUID());
        }
        Mono<Hotel> saved = hotelRepository.save(hotel);
        return saved.then(hotelByLetterRepository.save(new HotelByLetter(hotel)));
    }
}

调用 saved.then(…) 会创建一个新的 Mono。删除(不使用)Mono 将导致不执行 .then(…) 运算符。相反,返回 saved.then(…) 的结果也会保存 HotelByLetter.