如何用 WebFlux 和 MongoDB Reactive 框架替换一个文档?

How to replace one document with WebFlux and MongoDB Reactive frameworks?

我有一个旧项目,我想使用 WebFlux 和 MongoDB 响应式框架对其进行现代化改造,但我完全坚持使用更新方法。我不知道在订阅方法中传递什么。 replaceOne 调用 returns a Publisher 但我应该如何处理它?我读到的其中一个技巧是关于使用 mongo-java-driver-reactivestreams 中的 SubscriberHelpers class,但最新版本 (4.4) 中缺少它。任何帮助将不胜感激。

mongoDatabase.getCollection("players", Player.class).replaceOne(
    Filters.eq("email", player.getEmail()),
    player,
    new ReplaceOptions().upsert(true)
).subscribe( ??? );

更新:我创建了一个 DummySubscriber class 来管理似乎正常工作的订阅。

mongoDatabase.getCollection("players", Player.class).replaceOne(
    Filters.eq("email", player.getEmail()),
    player,
    new ReplaceOptions().upsert(true)
).subscribe(new DummySubscriber<>());

private static class DummySubscriber<T> implements Subscriber<T> {
    @Override
    public void onSubscribe(Subscription subscription) {
        subscription.request(Integer.MAX_VALUE);
    }

    @Override
    public void onNext(T t) {
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error while updating data");
    }

    @Override
    public void onComplete() {
        System.out.println("Data has been updated");
    }
}

查看您的示例后,我发现问题是您在对数据库进行调用时没有使用 ReactiveMongoTemplate

您决定使用原始 MongoDatabase 对象,该对象级别较低,不会为您提供 webflux api。

MongoDatabase 仅实现标准 reactivestreams 规范,该规范仅支持将 Subscriber<T> 作为订阅者实现的内容。

我建议不要使用低级别 MongoDatabase 对象,而是使用更高级别的抽象。

如果你打算使用 spring webflux,你应该使用 spring 提供的 ReactiveRepositories,或者如果你想使用低级别,请使用 ReactiveMongoTemplate

private final ReactiveMongoTemplate reactiveMongoTemplate;

@Autowired
public PlayerRepository(ReactiveMongoTemplate reactiveMongoTemplate) {
    this.reactiveMongoTemplate = reactiveMongoTemplate;
}


public void savePlayers(Player player) {
    final Query query = new Query();
    query.addCriteria(Criteria.where("email").is(player.getEmail()));

    final Update update = new Update();
    update.set("name", player.getName());
    update.set("email", player.getEmail());

    reactiveMongoTemplate.findAndModify(query, update, Player.class)
            .subscribe(
                updatedPlayer -> System.out.println("updated player"),
                error -> System.out.println("Something went wrong: " + error.getCause()),
                () -> System.out.println("update is finished"));
}

我强烈建议您在继续使用 webfluxReactiveMongoTemplate

之前学习如何在不使用 webflux 的情况下使用常规 MongoTemplate

有几个教程,关于使用 MongoTemplateReactiveMongoTemplate

https://www.baeldung.com/spring-data-mongodb-reactive

https://hantsy.github.io/spring-reactive-sample/data/data-mongo.html