使用 Spring 数据 MongoDB 在事务中调用两个不同的 ReactiveMongoRepository 中的方法?
Calling methods in two different ReactiveMongoRepository's in a transaction using Spring Data MongoDB?
将反应式编程模型与 Spring 数据 MongoDB 一起使用时,可以执行如下事务:
Mono<DeleteResult> result = template.inTransaction()
.execute(action -> action.remove(query(where("id").is("step-1")), Step.class));
但是Spring数据MongoDB也有对"reactive repositories"的支持,例如:
public interface PersonRepository extends ReactiveMongoRepository<Person, String>
Flux<Person> findByLocationNear(Point location, Distance distance);
}
和
public interface CarRepository extends ReactiveMongoRepository<Car, String>
Flux<Car> findByYear(int year);
}
我的问题是,鉴于您有 ReactiveMongoRepository
,您能否以某种方式利用 MongoDB 交易,例如在同一笔交易中同时插入 Person
和 Car
(在这种情况下使用 PersonRepository
和 CarRepository
)?如果是这样,你是怎么做到的?
我也一直在努力寻找 Transactional 支持 Mongo DB 和 Spring Boot
的 Reactive 风格的解决方案
幸好我自己想出来了。尽管 google 中的一些内容也有帮助,但这些都是非反应性的。
重要说明 - 对于 Spring boot 2.2.x 它运行良好,但对于 spring boot 2.3.x 它还有一些其他问题,它有内部重写和一起更改
您需要使用 ReactiveMongoTransactionManager 以及 ReactiveMongoDatabaseFactory,大部分细节在最后,也分享相同的代码回购
为了让 mongo 数据库支持事务,我们需要确保数据库应该 运行ning副本模式.
为什么我们需要它?因为否则你会得到这样的错误:-
会话不受此客户端连接到的 MongoDB 集群的支持
相同的说明如下:-
- 运行 基于 docker-compose 的 mongo 数据库服务器使用 docker-compose.yml,如下所示:-
version: "3"
services:
mongo:
hostname: mongo
container_name: localmongo_docker
image: mongo
expose:
- 27017
ports:
- 27017:27017
restart: always
entrypoint: [ "/usr/bin/mongod", "--bind_ip_all", "--replSet", "rs0" ]
volumes:
- ./mongodata:/data/db # need to create a docker volume named as mongodata first
- 镜像上来后,执行命令(这里localmongo_docker是容器的名字):-
docker exec -it localmongo_docker mongo
- 复制并粘贴下面的命令并执行
rs.initiate(
{
_id : 'rs0',
members: [
{ _id : 0, host : "mongo:27017" }
]
}
)
- 然后输入exit
退出执行
重要 - 代码仓库可以在我的 github - https://github.com/krnbr/mongo-spring-boot-template
代码重要说明如下:-
MongoConfiguration class 在 config 包中是重要的部分使交易工作,link 到配置 class 是 here
主要部分是Bean
@Bean
ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) {
return new ReactiveMongoTransactionManager(dbFactory);
}
为了检查代码的事务性要求的工作,您可以通过服务包 here
中的 class UserService
代码共享以防 link 对某人不起作用:-
配置和Beans内部
@Configuration
public class MongoConfiguration extends AbstractMongoClientConfiguration {
@Autowired
private MongoProperties mongoProperties;
@Bean
ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) {
return new ReactiveMongoTransactionManager(dbFactory);
}
@Override
protected String getDatabaseName() {
return mongoProperties.getDatabase();
}
@Override
public MongoClient mongoClient() {
return MongoClients.create(mongoProperties.getUri());
}
}
application.properties(与mongo db相关)
spring.data.mongodb.database=mongo
spring.data.mongodb.uri=mongodb://localhost:27017/mongo?replicaSet=rs0
文档Classes
角色Class
@Getter
@Setter
@Accessors(chain = true)
@Document(collection = "roles")
@TypeAlias("role")
public class Role implements Persistable<String> {
@Id
private String id;
@Field("role_name")
@Indexed(unique = true)
private String role;
@CreatedDate
private ZonedDateTime created;
@LastModifiedDate
private ZonedDateTime updated;
private Boolean deleted;
private Boolean enabled;
@Override
@JsonIgnore
public boolean isNew() {
if(getCreated() == null)
return true;
else
return false;
}
}
用户Class
@Getter
@Setter
@Accessors(chain = true)
@Document(collection = "users")
@JsonInclude(JsonInclude.Include.NON_NULL)
@TypeAlias("user")
public class User implements Persistable<String> {
@Id()
private String id;
@Field("username")
@Indexed(unique = true)
@JsonProperty("username")
private String userName;
@JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
private String password;
@CreatedDate
private ZonedDateTime created;
@LastModifiedDate
private ZonedDateTime updated;
private Boolean deleted;
private Boolean enabled;
@DBRef(lazy = true)
@JsonProperty("roles")
private List<Role> roles = new ArrayList();
@Override
@JsonIgnore
public boolean isNew() {
if(getCreated() == null)
return true;
else
return false;
}
}
用户配置文件Class
@Getter
@Setter
@Accessors(chain = true)
@Document(collection = "user_profiles")
@JsonInclude(JsonInclude.Include.NON_NULL)
@TypeAlias("user_profile")
public class UserProfile implements Persistable<String> {
@Id
private String id;
@Indexed(unique = true)
private String mobile;
@Indexed(unique = true)
private String email;
private String address;
private String firstName;
private String lastName;
@DBRef
private User user;
@CreatedDate
private ZonedDateTime created;
@LastModifiedDate
private ZonedDateTime updated;
private Boolean deleted;
private Boolean enabled;
@Override
@JsonIgnore
public boolean isNew() {
if(getCreated() == null)
return true;
else
return false;
}
}
响应式Mongo存储库接口
RoleRepository
public interface RoleRepository extends ReactiveMongoRepository<Role, String> {
Mono<Role> findByRole(String role);
Flux<Role> findAllByRoleIn(List<String> roles);
}
用户资料库
public interface UserRepository extends ReactiveMongoRepository<User, String> {
Mono<User> findByUserName(String userName);
}
UserProfileRepository
public interface UserProfileRepository extends ReactiveMongoRepository<UserProfile, String> {
}
User ServiceClass这里需要自己创建RuntimeExceptionClass,这里是AppRuntimeExceptionClass,我一直在用
@Slf4j
@Service
public class UserService {
@Autowired
private RoleRepository roleRepository;
@Autowired
private UserRepository userRepository;
@Autowired
private UserProfileRepository userProfileRepository;
@Transactional
public Mono<UserProfile> saveUserAndItsProfile(final UserRequest userRequest) {
Mono<Role> roleMono = roleRepository.findByRole("USER");
Mono<User> userMono = roleMono.flatMap(r -> {
User user = new User()
.setUserName(userRequest.getUsername())
.setPassword(userRequest.getPassword());
user.setRoles(Arrays.asList(r));
return userRepository.save(user);
}).onErrorResume(ex -> {
log.error(ex.getMessage());
if(ex instanceof DuplicateKeyException) {
String errorMessage = "The user with the username '"+userRequest.getUsername()+"' already exists";
log.error(errorMessage);
return Mono.error(new AppRuntimeException(errorMessage, ErrorCodes.CONFLICT, ex));
}
return Mono.error(new AppRuntimeException(ex.getMessage(), ErrorCodes.INTERNAL_SERVER_ERROR, ex));
});
Mono<UserProfile> userProfileMono = userMono.flatMap(u -> {
UserProfile userProfile = new UserProfile()
.setAddress(userRequest.getAddress())
.setEmail(userRequest.getEmail())
.setMobile(userRequest.getMobile())
.setUser(u);
return userProfileRepository.save(userProfile);
}).onErrorResume(ex -> {
log.error(ex.getMessage());
if(ex instanceof DuplicateKeyException) {
String errorMessage = "The user with the profile mobile'"+userRequest.getMobile()+"' and/or - email '"+userRequest.getEmail()+"' already exists";
log.error(errorMessage);
return Mono.error(new AppRuntimeException(errorMessage, ErrorCodes.CONFLICT, ex));
}
return Mono.error(new AppRuntimeException(ex.getMessage(), ErrorCodes.INTERNAL_SERVER_ERROR, ex));
});
return userProfileMono;
}
}
控制器和模型Class
用户请求 模型Class
@Getter
@Setter
@Accessors(chain = true)
@Slf4j
@JsonInclude(JsonInclude.Include.NON_NULL)
public class UserRequest {
private String username;
private String password;
private String mobile;
private String email;
private String address;
private String firstName;
private String lastName;
}
UserProfileApisController class
@Slf4j
@RestController
@RequestMapping("/apis/user/profile")
public class UserProfileApisController {
@Autowired
private UserService userService;
@PostMapping
public Mono<UserProfile> saveUserProfile(final @RequestBody UserRequest userRequest) {
return userService.saveUserAndItsProfile(userRequest);
}
}
只是对关于 MongoDB 副本集初始化的已接受答案的补充。
- 如果需要一个非固定端口的单一副本集进行测试,他们可能会使用封装此类初始化的the Testcontainers’ MongoDB Module:
final MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:4.2.8");
我们可以通过“mongoDBContainer.start()”启动它,并通过 try-with-resources 或“mongoDBContainer.stop()”停止它。 See more details on this module and Spring Data MongoDB here.
- 如果需要一个非固定端口的多节点副本集来测试复杂的生产问题,他们可能会使用this project,例如:
try (
//create a PSA mongoDbReplicaSet and auto-close it afterwards
final MongoDbReplicaSet mongoDbReplicaSet = MongoDbReplicaSet.builder()
//with 2 working nodes
.replicaSetNumber(2)
//with an arbiter node
.addArbiter(true)
//create a proxy for each node to simulate network partitioning
.addToxiproxy(true)
.build()
) {
//start it
mongoDbReplicaSet.start();
assertNotNull(mongoDbReplicaSet.getReplicaSetUrl());
//do some testing
}
将反应式编程模型与 Spring 数据 MongoDB 一起使用时,可以执行如下事务:
Mono<DeleteResult> result = template.inTransaction()
.execute(action -> action.remove(query(where("id").is("step-1")), Step.class));
但是Spring数据MongoDB也有对"reactive repositories"的支持,例如:
public interface PersonRepository extends ReactiveMongoRepository<Person, String>
Flux<Person> findByLocationNear(Point location, Distance distance);
}
和
public interface CarRepository extends ReactiveMongoRepository<Car, String>
Flux<Car> findByYear(int year);
}
我的问题是,鉴于您有 ReactiveMongoRepository
,您能否以某种方式利用 MongoDB 交易,例如在同一笔交易中同时插入 Person
和 Car
(在这种情况下使用 PersonRepository
和 CarRepository
)?如果是这样,你是怎么做到的?
我也一直在努力寻找 Transactional 支持 Mongo DB 和 Spring Boot
的 Reactive 风格的解决方案幸好我自己想出来了。尽管 google 中的一些内容也有帮助,但这些都是非反应性的。
重要说明 - 对于 Spring boot 2.2.x 它运行良好,但对于 spring boot 2.3.x 它还有一些其他问题,它有内部重写和一起更改
您需要使用 ReactiveMongoTransactionManager 以及 ReactiveMongoDatabaseFactory,大部分细节在最后,也分享相同的代码回购
为了让 mongo 数据库支持事务,我们需要确保数据库应该 运行ning副本模式.
为什么我们需要它?因为否则你会得到这样的错误:-
会话不受此客户端连接到的 MongoDB 集群的支持
相同的说明如下:-
- 运行 基于 docker-compose 的 mongo 数据库服务器使用 docker-compose.yml,如下所示:-
version: "3"
services:
mongo:
hostname: mongo
container_name: localmongo_docker
image: mongo
expose:
- 27017
ports:
- 27017:27017
restart: always
entrypoint: [ "/usr/bin/mongod", "--bind_ip_all", "--replSet", "rs0" ]
volumes:
- ./mongodata:/data/db # need to create a docker volume named as mongodata first
- 镜像上来后,执行命令(这里localmongo_docker是容器的名字):-
docker exec -it localmongo_docker mongo
- 复制并粘贴下面的命令并执行
rs.initiate(
{
_id : 'rs0',
members: [
{ _id : 0, host : "mongo:27017" }
]
}
)
- 然后输入exit 退出执行
重要 - 代码仓库可以在我的 github - https://github.com/krnbr/mongo-spring-boot-template
代码重要说明如下:-
MongoConfiguration class 在 config 包中是重要的部分使交易工作,link 到配置 class 是 here
主要部分是Bean
@Bean ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) { return new ReactiveMongoTransactionManager(dbFactory); }
为了检查代码的事务性要求的工作,您可以通过服务包 here
中的 class UserService
代码共享以防 link 对某人不起作用:-
配置和Beans内部
@Configuration
public class MongoConfiguration extends AbstractMongoClientConfiguration {
@Autowired
private MongoProperties mongoProperties;
@Bean
ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) {
return new ReactiveMongoTransactionManager(dbFactory);
}
@Override
protected String getDatabaseName() {
return mongoProperties.getDatabase();
}
@Override
public MongoClient mongoClient() {
return MongoClients.create(mongoProperties.getUri());
}
}
application.properties(与mongo db相关)
spring.data.mongodb.database=mongo
spring.data.mongodb.uri=mongodb://localhost:27017/mongo?replicaSet=rs0
文档Classes
角色Class
@Getter
@Setter
@Accessors(chain = true)
@Document(collection = "roles")
@TypeAlias("role")
public class Role implements Persistable<String> {
@Id
private String id;
@Field("role_name")
@Indexed(unique = true)
private String role;
@CreatedDate
private ZonedDateTime created;
@LastModifiedDate
private ZonedDateTime updated;
private Boolean deleted;
private Boolean enabled;
@Override
@JsonIgnore
public boolean isNew() {
if(getCreated() == null)
return true;
else
return false;
}
}
用户Class
@Getter
@Setter
@Accessors(chain = true)
@Document(collection = "users")
@JsonInclude(JsonInclude.Include.NON_NULL)
@TypeAlias("user")
public class User implements Persistable<String> {
@Id()
private String id;
@Field("username")
@Indexed(unique = true)
@JsonProperty("username")
private String userName;
@JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
private String password;
@CreatedDate
private ZonedDateTime created;
@LastModifiedDate
private ZonedDateTime updated;
private Boolean deleted;
private Boolean enabled;
@DBRef(lazy = true)
@JsonProperty("roles")
private List<Role> roles = new ArrayList();
@Override
@JsonIgnore
public boolean isNew() {
if(getCreated() == null)
return true;
else
return false;
}
}
用户配置文件Class
@Getter
@Setter
@Accessors(chain = true)
@Document(collection = "user_profiles")
@JsonInclude(JsonInclude.Include.NON_NULL)
@TypeAlias("user_profile")
public class UserProfile implements Persistable<String> {
@Id
private String id;
@Indexed(unique = true)
private String mobile;
@Indexed(unique = true)
private String email;
private String address;
private String firstName;
private String lastName;
@DBRef
private User user;
@CreatedDate
private ZonedDateTime created;
@LastModifiedDate
private ZonedDateTime updated;
private Boolean deleted;
private Boolean enabled;
@Override
@JsonIgnore
public boolean isNew() {
if(getCreated() == null)
return true;
else
return false;
}
}
响应式Mongo存储库接口
RoleRepository
public interface RoleRepository extends ReactiveMongoRepository<Role, String> {
Mono<Role> findByRole(String role);
Flux<Role> findAllByRoleIn(List<String> roles);
}
用户资料库
public interface UserRepository extends ReactiveMongoRepository<User, String> {
Mono<User> findByUserName(String userName);
}
UserProfileRepository
public interface UserProfileRepository extends ReactiveMongoRepository<UserProfile, String> {
}
User ServiceClass这里需要自己创建RuntimeExceptionClass,这里是AppRuntimeExceptionClass,我一直在用
@Slf4j
@Service
public class UserService {
@Autowired
private RoleRepository roleRepository;
@Autowired
private UserRepository userRepository;
@Autowired
private UserProfileRepository userProfileRepository;
@Transactional
public Mono<UserProfile> saveUserAndItsProfile(final UserRequest userRequest) {
Mono<Role> roleMono = roleRepository.findByRole("USER");
Mono<User> userMono = roleMono.flatMap(r -> {
User user = new User()
.setUserName(userRequest.getUsername())
.setPassword(userRequest.getPassword());
user.setRoles(Arrays.asList(r));
return userRepository.save(user);
}).onErrorResume(ex -> {
log.error(ex.getMessage());
if(ex instanceof DuplicateKeyException) {
String errorMessage = "The user with the username '"+userRequest.getUsername()+"' already exists";
log.error(errorMessage);
return Mono.error(new AppRuntimeException(errorMessage, ErrorCodes.CONFLICT, ex));
}
return Mono.error(new AppRuntimeException(ex.getMessage(), ErrorCodes.INTERNAL_SERVER_ERROR, ex));
});
Mono<UserProfile> userProfileMono = userMono.flatMap(u -> {
UserProfile userProfile = new UserProfile()
.setAddress(userRequest.getAddress())
.setEmail(userRequest.getEmail())
.setMobile(userRequest.getMobile())
.setUser(u);
return userProfileRepository.save(userProfile);
}).onErrorResume(ex -> {
log.error(ex.getMessage());
if(ex instanceof DuplicateKeyException) {
String errorMessage = "The user with the profile mobile'"+userRequest.getMobile()+"' and/or - email '"+userRequest.getEmail()+"' already exists";
log.error(errorMessage);
return Mono.error(new AppRuntimeException(errorMessage, ErrorCodes.CONFLICT, ex));
}
return Mono.error(new AppRuntimeException(ex.getMessage(), ErrorCodes.INTERNAL_SERVER_ERROR, ex));
});
return userProfileMono;
}
}
控制器和模型Class
用户请求 模型Class
@Getter
@Setter
@Accessors(chain = true)
@Slf4j
@JsonInclude(JsonInclude.Include.NON_NULL)
public class UserRequest {
private String username;
private String password;
private String mobile;
private String email;
private String address;
private String firstName;
private String lastName;
}
UserProfileApisController class
@Slf4j
@RestController
@RequestMapping("/apis/user/profile")
public class UserProfileApisController {
@Autowired
private UserService userService;
@PostMapping
public Mono<UserProfile> saveUserProfile(final @RequestBody UserRequest userRequest) {
return userService.saveUserAndItsProfile(userRequest);
}
}
只是对关于 MongoDB 副本集初始化的已接受答案的补充。
- 如果需要一个非固定端口的单一副本集进行测试,他们可能会使用封装此类初始化的the Testcontainers’ MongoDB Module:
final MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:4.2.8");
我们可以通过“mongoDBContainer.start()”启动它,并通过 try-with-resources 或“mongoDBContainer.stop()”停止它。 See more details on this module and Spring Data MongoDB here.
- 如果需要一个非固定端口的多节点副本集来测试复杂的生产问题,他们可能会使用this project,例如:
try (
//create a PSA mongoDbReplicaSet and auto-close it afterwards
final MongoDbReplicaSet mongoDbReplicaSet = MongoDbReplicaSet.builder()
//with 2 working nodes
.replicaSetNumber(2)
//with an arbiter node
.addArbiter(true)
//create a proxy for each node to simulate network partitioning
.addToxiproxy(true)
.build()
) {
//start it
mongoDbReplicaSet.start();
assertNotNull(mongoDbReplicaSet.getReplicaSetUrl());
//do some testing
}