了解 Spring 的 Web 反应框架
Understanding Spring's Web Reactive Framework
我目前正在使用 SpringBoot 2、spring-boot-starter-webflux 在 netty 和 jOOQ.
下面是我经过数小时的研究和 Whosebug 搜索得出的代码。我内置了很多
登录以查看哪个线程上发生了什么。
用户控制器:
@RequestMapping(value = "/user", method = RequestMethod.POST)
public Mono<ResponseEntity<Integer>> createUser(@RequestBody ImUser user) {
return Mono.just(user)
.map(it -> {
logger.debug("Receiving request on thread: " + Thread.currentThread().getName());
return it;
})
.map(userService::create)
.map(it -> {
logger.debug("Sending response on thread: " + Thread.currentThread().getName());
return ResponseEntity.status(HttpStatus.CREATED).body(it);
})
.mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));
}
用户服务:
public int create(ImUser user) {
return Mono.just(user)
.subscribeOn(Schedulers.elastic())
.map(u -> {
logger.debug("UserService thread: " + Thread.currentThread().getName());
return imUserDao.insertUser(u);
})
.block();
}
UserDao:
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)
public int insertUser(ImUser user) {
logger.debug("Insert DB on thread: " + Thread.currentThread().getName());
return dsl.insertInto(IM_USER,IM_USER.VERSION, IM_USER.FIRST_NAME, IM_USER.LAST_NAME, IM_USER.BIRTHDATE, IM_USER.GENDER)
.values(1, user.getFirstName(), user.getLastName(), user.getBirthdate(), user.getGender())
.returning(IM_USER.ID)
.fetchOne()
.getId();
}
代码按预期工作,"Receiving request" 和 "Sending response" 都 运行 在同一个线程上(reactor-http-server-epoll-x)
而阻塞代码(对 imUserDao.insertUser(u) 的调用)运行s 在弹性调度程序线程上(elastic-x).
事务绑定到调用注释方法的线程(即 elastic-x),因此按预期工作(我已经测试过
为简单起见,这里未发布其他方法。
这是一个日志样本:
20:57:21,384 DEBUG admin.UserController| Receiving request on thread: reactor-http-server-epoll-7
20:57:21,387 DEBUG admin.UserService| UserService thread: elastic-2
20:57:21,391 DEBUG admin.ExtendedUserDao| Insert DB on thread: elastic-2
20:57:21,393 DEBUG tools.LoggerListener| Executing query
...
20:57:21,401 DEBUG tools.StopWatch| Finishing : Total: 9.355ms, +3.355ms
20:57:21,409 DEBUG admin.UserController| Sending response on thread: reactor-http-server-epoll-7
我已经研究反应式编程很长时间了,但从来没有完全接触过任何反应式编程。既然我是,我想知道我是否做对了。
所以这是我的问题:
1. 上面的代码是处理传入 HTTP 请求、查询数据库然后响应的好方法吗?
请忽略我为了理智而内置的 logger.debug(...) 调用 :) 我有点希望有一个 Flux< ImUser> 作为控制器方法的参数,在我有多个潜在请求流的意义上
这将在某个时候出现,并且将以相同的方式处理。相反,我发现的示例会在每次收到请求时创建一个 Mono.from(...);。
2. 在UserService中创建的第二个Mono(Mono.just(user))感觉有些别扭。我知道我需要开始一个新的流才能
运行 elastic Scheduler 上的代码,但是没有执行此操作的运算符吗?
3.从代码的写法来看,我了解到UserService里面的Mono会一直阻塞到DB操作完成,
但是服务于请求的原始流没有被阻塞。这个对吗?
4. 我计划用并行调度程序替换 Schedulers.elastic(),我可以在其中配置工作人员的数量线程。这个想法是最大工作线程数应该与最大数据库连接数相同。
当 Scheduler 中的所有工作线程都忙时会发生什么?那是背压进入的时候吗?
5. 我最初希望在我的控制器中包含以下代码:
return userService.create(user)
.map(it -> ResponseEntity.status(HttpStatus.CREATED).body(it))
.mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));
但我没能做到这一点,也无法将 运行ning 的东西放在正确的线程中。有什么方法可以在我的代码中实现这个吗?
如有任何帮助,我们将不胜感激。谢谢!
服务和控制器
您的服务阻塞的事实是有问题的,因为在控制器中您从 map
内部调用一个阻塞方法,该方法不会在单独的线程上移动。这有可能阻止所有个控制器。
你可以做的是从 UserService#create
return 一个 Mono
(删除末尾的 block()
)。由于该服务确保 Dao 方法调用是隔离的,因此问题较少。从那里开始,无需在控制器中执行 Mono.just(user)
:只需直接在生成的 Mono 上调用创建并开始链接运算符:
@RequestMapping(value = "/user", method = RequestMethod.POST)
public Mono<ResponseEntity<Integer>> createUser(@RequestBody ImUser user) {
//this log as you saw was executed in the same thread as the controller method
logger.debug("Receiving request on thread: " + Thread.currentThread().getName());
return userService.create(user)
.map(it -> {
logger.debug("Sending response on thread: " + Thread.currentThread().getName());
return ResponseEntity.status(HttpStatus.CREATED).body(it);
})
.mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));
}
日志记录
请注意,如果您想记录某些内容,有几个比 map
和 returning it
:
更好的选择
doOnNext
方法是为此量身定制的:对反应信号之一做出反应(在这种情况下,onNext
:发出一个值)并执行一些非变异动作,使输出序列与源序列完全相同。 doOn 的 "side-effect" 可以写入控制台或递增统计计数器,例如...还有 doOnComplete、doOnError、doOnSubscribe、doOnCancel 等...
log
只是记录其上方序列中的所有事件。它将检测您是否使用 SLF4J,如果是,则在 DEBUG 级别使用配置的记录器。否则它将使用 JDK 日志记录功能(因此您还需要将其配置为显示 DEBUG 级别日志)。
关于交易的一句话 或者任何依赖于 ThreadLocal
的事情
ThreadLocal 和线程粘性在反应式编程中可能会出现问题,因为底层执行模型在整个序列中保持不变的保证较少。 Flux
可以分几个步骤执行,每个步骤在不同的 Scheduler
中执行(线程或线程池也是如此)。即使在特定步骤,一个值也可以由底层线程池的线程 A 处理,而下一个值稍后到达,将在线程 B 上处理。
在这种情况下,依赖 Thread Local 并不那么简单,我们目前正在积极致力于提供更适合反应世界的替代方案。
您创建连接池大小的池的想法很好,但不一定足够,事务流量可能会使用多个线程,因此事务可能会污染某些线程。
当池用完线程时会发生什么
如果您使用特定的 Scheduler
来隔离阻塞行为,那么一旦它用完线程,它就会抛出一个 RejectedExecutionException
.
我目前正在使用 SpringBoot 2、spring-boot-starter-webflux 在 netty 和 jOOQ.
下面是我经过数小时的研究和 Whosebug 搜索得出的代码。我内置了很多
登录以查看哪个线程上发生了什么。
用户控制器:
@RequestMapping(value = "/user", method = RequestMethod.POST)
public Mono<ResponseEntity<Integer>> createUser(@RequestBody ImUser user) {
return Mono.just(user)
.map(it -> {
logger.debug("Receiving request on thread: " + Thread.currentThread().getName());
return it;
})
.map(userService::create)
.map(it -> {
logger.debug("Sending response on thread: " + Thread.currentThread().getName());
return ResponseEntity.status(HttpStatus.CREATED).body(it);
})
.mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));
}
用户服务:
public int create(ImUser user) {
return Mono.just(user)
.subscribeOn(Schedulers.elastic())
.map(u -> {
logger.debug("UserService thread: " + Thread.currentThread().getName());
return imUserDao.insertUser(u);
})
.block();
}
UserDao:
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)
public int insertUser(ImUser user) {
logger.debug("Insert DB on thread: " + Thread.currentThread().getName());
return dsl.insertInto(IM_USER,IM_USER.VERSION, IM_USER.FIRST_NAME, IM_USER.LAST_NAME, IM_USER.BIRTHDATE, IM_USER.GENDER)
.values(1, user.getFirstName(), user.getLastName(), user.getBirthdate(), user.getGender())
.returning(IM_USER.ID)
.fetchOne()
.getId();
}
代码按预期工作,"Receiving request" 和 "Sending response" 都 运行 在同一个线程上(reactor-http-server-epoll-x) 而阻塞代码(对 imUserDao.insertUser(u) 的调用)运行s 在弹性调度程序线程上(elastic-x). 事务绑定到调用注释方法的线程(即 elastic-x),因此按预期工作(我已经测试过 为简单起见,这里未发布其他方法。
这是一个日志样本:
20:57:21,384 DEBUG admin.UserController| Receiving request on thread: reactor-http-server-epoll-7
20:57:21,387 DEBUG admin.UserService| UserService thread: elastic-2
20:57:21,391 DEBUG admin.ExtendedUserDao| Insert DB on thread: elastic-2
20:57:21,393 DEBUG tools.LoggerListener| Executing query
...
20:57:21,401 DEBUG tools.StopWatch| Finishing : Total: 9.355ms, +3.355ms
20:57:21,409 DEBUG admin.UserController| Sending response on thread: reactor-http-server-epoll-7
我已经研究反应式编程很长时间了,但从来没有完全接触过任何反应式编程。既然我是,我想知道我是否做对了。 所以这是我的问题:
1. 上面的代码是处理传入 HTTP 请求、查询数据库然后响应的好方法吗? 请忽略我为了理智而内置的 logger.debug(...) 调用 :) 我有点希望有一个 Flux< ImUser> 作为控制器方法的参数,在我有多个潜在请求流的意义上 这将在某个时候出现,并且将以相同的方式处理。相反,我发现的示例会在每次收到请求时创建一个 Mono.from(...);。
2. 在UserService中创建的第二个Mono(Mono.just(user))感觉有些别扭。我知道我需要开始一个新的流才能 运行 elastic Scheduler 上的代码,但是没有执行此操作的运算符吗?
3.从代码的写法来看,我了解到UserService里面的Mono会一直阻塞到DB操作完成, 但是服务于请求的原始流没有被阻塞。这个对吗?
4. 我计划用并行调度程序替换 Schedulers.elastic(),我可以在其中配置工作人员的数量线程。这个想法是最大工作线程数应该与最大数据库连接数相同。 当 Scheduler 中的所有工作线程都忙时会发生什么?那是背压进入的时候吗?
5. 我最初希望在我的控制器中包含以下代码:
return userService.create(user)
.map(it -> ResponseEntity.status(HttpStatus.CREATED).body(it))
.mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));
但我没能做到这一点,也无法将 运行ning 的东西放在正确的线程中。有什么方法可以在我的代码中实现这个吗?
如有任何帮助,我们将不胜感激。谢谢!
服务和控制器
您的服务阻塞的事实是有问题的,因为在控制器中您从 map
内部调用一个阻塞方法,该方法不会在单独的线程上移动。这有可能阻止所有个控制器。
你可以做的是从 UserService#create
return 一个 Mono
(删除末尾的 block()
)。由于该服务确保 Dao 方法调用是隔离的,因此问题较少。从那里开始,无需在控制器中执行 Mono.just(user)
:只需直接在生成的 Mono 上调用创建并开始链接运算符:
@RequestMapping(value = "/user", method = RequestMethod.POST)
public Mono<ResponseEntity<Integer>> createUser(@RequestBody ImUser user) {
//this log as you saw was executed in the same thread as the controller method
logger.debug("Receiving request on thread: " + Thread.currentThread().getName());
return userService.create(user)
.map(it -> {
logger.debug("Sending response on thread: " + Thread.currentThread().getName());
return ResponseEntity.status(HttpStatus.CREATED).body(it);
})
.mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));
}
日志记录
请注意,如果您想记录某些内容,有几个比 map
和 returning it
:
doOnNext
方法是为此量身定制的:对反应信号之一做出反应(在这种情况下,onNext
:发出一个值)并执行一些非变异动作,使输出序列与源序列完全相同。 doOn 的 "side-effect" 可以写入控制台或递增统计计数器,例如...还有 doOnComplete、doOnError、doOnSubscribe、doOnCancel 等...log
只是记录其上方序列中的所有事件。它将检测您是否使用 SLF4J,如果是,则在 DEBUG 级别使用配置的记录器。否则它将使用 JDK 日志记录功能(因此您还需要将其配置为显示 DEBUG 级别日志)。
关于交易的一句话 或者任何依赖于 ThreadLocal
的事情
ThreadLocal 和线程粘性在反应式编程中可能会出现问题,因为底层执行模型在整个序列中保持不变的保证较少。 Flux
可以分几个步骤执行,每个步骤在不同的 Scheduler
中执行(线程或线程池也是如此)。即使在特定步骤,一个值也可以由底层线程池的线程 A 处理,而下一个值稍后到达,将在线程 B 上处理。
在这种情况下,依赖 Thread Local 并不那么简单,我们目前正在积极致力于提供更适合反应世界的替代方案。
您创建连接池大小的池的想法很好,但不一定足够,事务流量可能会使用多个线程,因此事务可能会污染某些线程。
当池用完线程时会发生什么
如果您使用特定的 Scheduler
来隔离阻塞行为,那么一旦它用完线程,它就会抛出一个 RejectedExecutionException
.