未执行反应式 Cassandra 回调 doInSession
Reactive Cassandra Callback doInSession is not executed
我按照一个例子稍作修改:https://dzone.com/articles/spring-webflux-first-steps
我的 ServiceImpl 看起来像:
private final HotelRepository hotelRepository;
private final HotelByLetterRepository hotelByLetterRepository;
public HotelServiceImpl(HotelRepository hotelRepository, HotelByLetterRepository hotelByLetterRepository) {
this.hotelRepository = hotelRepository;
this.hotelByLetterRepository = hotelByLetterRepository;
}
@Override
public Mono<Hotel> save(Hotel hotel) {
if (hotel.getId() == null) {
hotel.setId(UUID.randomUUID());
}
Mono<Hotel> saved = hotelRepository.save(hotel);
saved.then(hotelByLetterRepository.save(new HotelByLetter(hotel)));
return saved;
}
在 Hotel
实体保存逻辑后尝试保存 HottelByLetter
。
在存储库中,我注入 ReactiveCassandraOperations
,对于保存方法,我只调用插入方法。
@Repository
public class CassandraHotelRepository implements HotelRepository {
private final ReactiveCassandraOperations cassandraTemplate;
public CassandraHotelRepository(ReactiveCassandraOperations cassandraTemplate) {
this.cassandraTemplate = cassandraTemplate;
}
@Override
public Mono<Hotel> save(Hotel hotel) {
return cassandraTemplate.insert(hotel);
}
}
服务调用后只保存Hotel
,不保存HotelByLetter
。
调试后我发现:
在 ReactiveCqlTemplate
中,方法 createFlux
被正确调用了两次 ReactiveSessionCallback
。
protected <T> Flux<T> createFlux(ReactiveSessionCallback<T> callback) {
Assert.notNull(callback, "ReactiveStatementCallback must not be null");
ReactiveSession session = getSession();
return Flux.defer(() -> callback.doInSession(session));
}
但是,callback.doInSession(session)
插入新酒店只执行一次。
我也尝试扩展 ReactiveCrudRepository
,但同样的问题。
我正在使用:org.springframework.data/spring-data-cassandra/2.0.0.RELEASE
TL;DR;
您需要使用每个发布者运算符的结果来应用创建 Publisher
的实际操作。
说明
Project Reactor 的基本概念是永远不会通过运算符改变 Publisher
,而是返回一个新实例。这与 CompletableFuture
之类的 Future
不同,您可以在其中注册回调,并且您没有义务重用回调注册方法的结果来使其工作。
您的 HotelServiceImpl
代码应如下所示:
class HotelServiceImpl implements HotelService {
// …
@Override
public Mono<Hotel> save(Hotel hotel) {
if (hotel.getId() == null) {
hotel.setId(UUID.randomUUID());
}
Mono<Hotel> saved = hotelRepository.save(hotel);
return saved.then(hotelByLetterRepository.save(new HotelByLetter(hotel)));
}
}
调用 saved.then(…)
会创建一个新的 Mono
。删除(不使用)Mono
将导致不执行 .then(…)
运算符。相反,返回 saved.then(…)
的结果也会保存 HotelByLetter
.
我按照一个例子稍作修改:https://dzone.com/articles/spring-webflux-first-steps
我的 ServiceImpl 看起来像:
private final HotelRepository hotelRepository;
private final HotelByLetterRepository hotelByLetterRepository;
public HotelServiceImpl(HotelRepository hotelRepository, HotelByLetterRepository hotelByLetterRepository) {
this.hotelRepository = hotelRepository;
this.hotelByLetterRepository = hotelByLetterRepository;
}
@Override
public Mono<Hotel> save(Hotel hotel) {
if (hotel.getId() == null) {
hotel.setId(UUID.randomUUID());
}
Mono<Hotel> saved = hotelRepository.save(hotel);
saved.then(hotelByLetterRepository.save(new HotelByLetter(hotel)));
return saved;
}
在 Hotel
实体保存逻辑后尝试保存 HottelByLetter
。
在存储库中,我注入 ReactiveCassandraOperations
,对于保存方法,我只调用插入方法。
@Repository
public class CassandraHotelRepository implements HotelRepository {
private final ReactiveCassandraOperations cassandraTemplate;
public CassandraHotelRepository(ReactiveCassandraOperations cassandraTemplate) {
this.cassandraTemplate = cassandraTemplate;
}
@Override
public Mono<Hotel> save(Hotel hotel) {
return cassandraTemplate.insert(hotel);
}
}
服务调用后只保存Hotel
,不保存HotelByLetter
。
调试后我发现:
在 ReactiveCqlTemplate
中,方法 createFlux
被正确调用了两次 ReactiveSessionCallback
。
protected <T> Flux<T> createFlux(ReactiveSessionCallback<T> callback) {
Assert.notNull(callback, "ReactiveStatementCallback must not be null");
ReactiveSession session = getSession();
return Flux.defer(() -> callback.doInSession(session));
}
但是,callback.doInSession(session)
插入新酒店只执行一次。
我也尝试扩展 ReactiveCrudRepository
,但同样的问题。
我正在使用:org.springframework.data/spring-data-cassandra/2.0.0.RELEASE
TL;DR;
您需要使用每个发布者运算符的结果来应用创建 Publisher
的实际操作。
说明
Project Reactor 的基本概念是永远不会通过运算符改变 Publisher
,而是返回一个新实例。这与 CompletableFuture
之类的 Future
不同,您可以在其中注册回调,并且您没有义务重用回调注册方法的结果来使其工作。
您的 HotelServiceImpl
代码应如下所示:
class HotelServiceImpl implements HotelService {
// …
@Override
public Mono<Hotel> save(Hotel hotel) {
if (hotel.getId() == null) {
hotel.setId(UUID.randomUUID());
}
Mono<Hotel> saved = hotelRepository.save(hotel);
return saved.then(hotelByLetterRepository.save(new HotelByLetter(hotel)));
}
}
调用 saved.then(…)
会创建一个新的 Mono
。删除(不使用)Mono
将导致不执行 .then(…)
运算符。相反,返回 saved.then(…)
的结果也会保存 HotelByLetter
.