使用 Spring 数据 MongoDB 在事务中调用两个不同的 ReactiveMongoRepository 中的方法?

Calling methods in two different ReactiveMongoRepository's in a transaction using Spring Data MongoDB?

将反应式编程模型与 Spring 数据 MongoDB 一起使用时,可以执行如下事务:

Mono<DeleteResult> result = template.inTransaction()                                      
    .execute(action -> action.remove(query(where("id").is("step-1")), Step.class)); 

但是Spring数据MongoDB也有对"reactive repositories"的支持,例如:

public interface PersonRepository extends ReactiveMongoRepository<Person, String>

  Flux<Person> findByLocationNear(Point location, Distance distance);
}

public interface CarRepository extends ReactiveMongoRepository<Car, String>

  Flux<Car> findByYear(int year);
}

我的问题是,鉴于您有 ReactiveMongoRepository,您能否以某种方式利用 MongoDB 交易,例如在同一笔交易中同时插入 PersonCar(在这种情况下使用 PersonRepositoryCarRepository)?如果是这样,你是怎么做到的?

我也一直在努力寻找 Transactional 支持 Mongo DB 和 Spring Boot

的 Reactive 风格的解决方案

幸好我自己想出来了。尽管 google 中的一些内容也有帮助,但这些都是非反应性的。

重要说明 - 对于 Spring boot 2.2.x 它运行良好,但对于 spring boot 2.3.x 它还有一些其他问题,它有内部重写和一起更改

  • 您需要使用 ReactiveMongoTransactionManager 以及 ReactiveMongoDatabaseFactory,大部分细节在最后,也分享相同的代码回购

  • 为了让 mongo 数据库支持事务,我们需要确保数据库应该 运行ning副本模式.

    为什么我们需要它?因为否则你会得到这样的错误:-

    会话不受此客户端连接到的 MongoDB 集群的支持

相同的说明如下:-

  1. 运行 基于 docker-compose 的 mongo 数据库服务器使用 docker-compose.yml,如下所示:-
version: "3"
services:
    mongo:
        hostname: mongo
        container_name: localmongo_docker
        image: mongo
        expose:
          - 27017
        ports:
          - 27017:27017
        restart: always
        entrypoint: [ "/usr/bin/mongod", "--bind_ip_all", "--replSet", "rs0" ]
        volumes:
          - ./mongodata:/data/db # need to create a docker volume named as mongodata first

  1. 镜像上来后,执行命令(这里localmongo_docker是容器的名字):-
docker exec -it localmongo_docker mongo
  1. 复制并粘贴下面的命令并执行
rs.initiate(
   {
     _id : 'rs0',
     members: [
       { _id : 0, host : "mongo:27017" }
     ]
   }
 )
  1. 然后输入exit
  2. 退出执行

重要 - 代码仓库可以在我的 github - https://github.com/krnbr/mongo-spring-boot-template

代码重要说明如下:-

  • MongoConfiguration class 在 config 包中是重要的部分使交易工作,link 到配置 class 是 here

  • 主要部分是Bean

     @Bean
     ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) {
         return new ReactiveMongoTransactionManager(dbFactory);
     }
    
  • 为了检查代码的事务性要求的工作,您可以通过服务包 here

    中的 class UserService

代码共享以防 link 对某人不起作用:-

配置和Beans内部

@Configuration
public class MongoConfiguration extends AbstractMongoClientConfiguration {

    @Autowired
    private MongoProperties mongoProperties;

    @Bean
    ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) {
        return new ReactiveMongoTransactionManager(dbFactory);
    }

    @Override
    protected String getDatabaseName() {
        return mongoProperties.getDatabase();
    }

    @Override
    public MongoClient mongoClient() {
        return MongoClients.create(mongoProperties.getUri());
    }
}

application.properties(与mongo db相关)

spring.data.mongodb.database=mongo
spring.data.mongodb.uri=mongodb://localhost:27017/mongo?replicaSet=rs0

文档Classes

角色Class

@Getter
@Setter
@Accessors(chain = true)
@Document(collection = "roles")
@TypeAlias("role")
public class Role implements Persistable<String> {

    @Id
    private String id;

    @Field("role_name")
    @Indexed(unique = true)
    private String role;

    @CreatedDate
    private ZonedDateTime created;

    @LastModifiedDate
    private ZonedDateTime updated;

    private Boolean deleted;

    private Boolean enabled;

    @Override
    @JsonIgnore
    public boolean isNew() {
        if(getCreated() == null)
            return true;
        else
            return false;
    }
}

用户Class

@Getter
@Setter
@Accessors(chain = true)
@Document(collection = "users")
@JsonInclude(JsonInclude.Include.NON_NULL)
@TypeAlias("user")
public class User implements Persistable<String> {

    @Id()
    private String id;

    @Field("username")
    @Indexed(unique = true)
    @JsonProperty("username")
    private String userName;

    @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
    private String password;

    @CreatedDate
    private ZonedDateTime created;

    @LastModifiedDate
    private ZonedDateTime updated;

    private Boolean deleted;

    private Boolean enabled;

    @DBRef(lazy = true)
    @JsonProperty("roles")
    private List<Role> roles = new ArrayList();

    @Override
    @JsonIgnore
    public boolean isNew() {
        if(getCreated() == null)
            return true;
        else
            return false;
    }
}

用户配置文件Class

@Getter
@Setter
@Accessors(chain = true)
@Document(collection = "user_profiles")
@JsonInclude(JsonInclude.Include.NON_NULL)
@TypeAlias("user_profile")
public class UserProfile implements Persistable<String> {

    @Id
    private String id;

    @Indexed(unique = true)
    private String mobile;

    @Indexed(unique = true)
    private String email;

    private String address;

    private String firstName;

    private String lastName;

    @DBRef
    private User user;

    @CreatedDate
    private ZonedDateTime created;

    @LastModifiedDate
    private ZonedDateTime updated;

    private Boolean deleted;

    private Boolean enabled;

    @Override
    @JsonIgnore
    public boolean isNew() {
        if(getCreated() == null)
            return true;
        else
            return false;
    }

}

响应式Mongo存储库接口

RoleRepository

public interface RoleRepository extends ReactiveMongoRepository<Role, String> {

    Mono<Role> findByRole(String role);

    Flux<Role> findAllByRoleIn(List<String> roles);

}

用户资料库

public interface UserRepository extends ReactiveMongoRepository<User, String> {

    Mono<User> findByUserName(String userName);

}

UserProfileRepository

public interface UserProfileRepository extends ReactiveMongoRepository<UserProfile, String> {
}

User ServiceClass这里需要自己创建RuntimeExceptionClass,这里是AppRuntimeExceptionClass,我一直在用

@Slf4j
@Service
public class UserService {

    @Autowired
    private RoleRepository roleRepository;

    @Autowired
    private UserRepository userRepository;

    @Autowired
    private UserProfileRepository userProfileRepository;

    @Transactional
    public Mono<UserProfile> saveUserAndItsProfile(final UserRequest userRequest) {

        Mono<Role> roleMono = roleRepository.findByRole("USER");

        Mono<User> userMono = roleMono.flatMap(r -> {
            User user = new User()
                    .setUserName(userRequest.getUsername())
                    .setPassword(userRequest.getPassword());
            user.setRoles(Arrays.asList(r));
            return userRepository.save(user);
        }).onErrorResume(ex -> {
            log.error(ex.getMessage());
            if(ex instanceof DuplicateKeyException) {
                String errorMessage = "The user with the username '"+userRequest.getUsername()+"' already exists";
                log.error(errorMessage);
                return Mono.error(new AppRuntimeException(errorMessage, ErrorCodes.CONFLICT, ex));
            }
            return Mono.error(new AppRuntimeException(ex.getMessage(), ErrorCodes.INTERNAL_SERVER_ERROR, ex));
        });

        Mono<UserProfile> userProfileMono = userMono.flatMap(u -> {
            UserProfile userProfile = new UserProfile()
                    .setAddress(userRequest.getAddress())
                    .setEmail(userRequest.getEmail())
                    .setMobile(userRequest.getMobile())
                    .setUser(u);
            return userProfileRepository.save(userProfile);
        }).onErrorResume(ex -> {
            log.error(ex.getMessage());
            if(ex instanceof DuplicateKeyException) {
                String errorMessage = "The user with the profile mobile'"+userRequest.getMobile()+"' and/or - email '"+userRequest.getEmail()+"' already exists";
                log.error(errorMessage);
                return Mono.error(new AppRuntimeException(errorMessage, ErrorCodes.CONFLICT, ex));
            }
            return Mono.error(new AppRuntimeException(ex.getMessage(), ErrorCodes.INTERNAL_SERVER_ERROR, ex));
        });

        return userProfileMono;

    }

}

控制器和模型Class

用户请求 模型Class

@Getter
@Setter
@Accessors(chain = true)
@Slf4j
@JsonInclude(JsonInclude.Include.NON_NULL)
public class UserRequest {

    private String username;
    private String password;
    private String mobile;
    private String email;
    private String address;
    private String firstName;
    private String lastName;

}

UserProfileApisController class

@Slf4j
@RestController
@RequestMapping("/apis/user/profile")
public class UserProfileApisController {

    @Autowired
    private UserService userService;

    @PostMapping
    public Mono<UserProfile> saveUserProfile(final @RequestBody UserRequest userRequest) {
        return userService.saveUserAndItsProfile(userRequest);
    }

}

只是对关于 MongoDB 副本集初始化的已接受答案的补充。

  1. 如果需要一个非固定端口的单一副本集进行测试,他们可能会使用封装此类初始化的the Testcontainers’ MongoDB Module
final MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:4.2.8");

我们可以通过“mongoDBContainer.start()”启动它,并通过 try-with-resources 或“mongoDBContainer.stop()”停止它。 See more details on this module and Spring Data MongoDB here.

  1. 如果需要一个非固定端口的多节点副本集来测试复杂的生产问题,他们可能会使用this project,例如:
try (
  //create a PSA mongoDbReplicaSet and auto-close it afterwards
  final MongoDbReplicaSet mongoDbReplicaSet = MongoDbReplicaSet.builder()
    //with 2 working nodes
    .replicaSetNumber(2)
    //with an arbiter node
    .addArbiter(true)
    //create a proxy for each node to simulate network partitioning
    .addToxiproxy(true)
    .build()
) {
  //start it
  mongoDbReplicaSet.start();
  assertNotNull(mongoDbReplicaSet.getReplicaSetUrl());
  //do some testing
}