Webflux - 多次读取单声道(请求)
Webflux - Read mono (request) multiple times
首先,我很困惑使用反应式处理程序方法参数是否有任何好处。
其次,当我需要多次读取这些参数时,我在使用这种技术时遇到了一些问题。
public Mono<TokenDto> generateToken(Mono<UserDto> user) {
var userDto = user.block();
return userRepository.findByEmail(userDto.getEmail())
.filter(foundUser -> bCryptPasswordEncoder.matches(userDto.getPassword(), foundUser.getPassword()))
.map(foundUser -> JWT.create()
.withSubject(foundUser.getEmail())
.withExpiresAt(new Date(System.currentTimeMillis() + jwtTtl.toMillis()))
.withClaim(USER_ROLES_CLAIM, foundUser.getRoles().stream().map(RoleEnum::name).collect(Collectors.toList()))
.sign(Algorithm.HMAC512(jwtSecret))
)
.map(TokenDto::new);
}
如您所见,我需要阻塞才能实际读取参数两次。我尝试使用 userRepository 结果压缩,但最终我在 Tuple2 结构中得到了 Mono。
Reactor对此有什么解决方案吗?也许有一个函数 publish/repeat 可能是有前途的。我成功地建立了一个管道,但是多次读取请求体总是出错。
谢谢。
错误:
"timestamp": "2020-07-03T06:24:23.592+00:00",
"path": "/login",
"status": 400,
"error": "Bad Request",
"message": "Request body is missing: public reactor.core.publisher.Mono<org.springframework.http.ResponseEntity<com.ostrozlik.taskagent.web.dto.TokenDto>> com.ostrozlik.taskagent.web.controller.UserController.login(reactor.core.publisher.Mono<com.ostrozlik.taskagent.web.dto.UserDto>)",
"requestId": "defc8953-1",
"trace": "org.springframework.web.server.ServerWebInputException: 400 BAD_REQUEST \"Request body is missing: public reactor.core.publisher.Mono<org.springframework.http.ResponseEntity<com.ostrozlik.taskagent.web.dto.TokenDto>> com.ostrozlik.taskagent.web.controller.UserController.login(reactor.core.publisher.Mono<com.ostrozlik.taskagent.web.dto.UserDto>)\"\n\tat org.springframework.web.reactive.result.method.annotation.AbstractMessageReaderArgumentResolver.handleMissingBody(AbstractMessageReaderArgumentResolver.java:230)\n\tSuppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: \nError has been observed at the following site(s):\n\t|_ checkpoint ⇢ Handler com.ostrozlik.taskagent.web.controller.UserController#login(Mono) [DispatcherHandler]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.authorization.AuthorizationWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.authorization.ExceptionTranslationWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.authentication.logout.LogoutWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.savedrequest.ServerRequestCacheWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.context.SecurityContextServerWebExchangeWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.context.ReactorContextWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.header.HttpHeaderWriterWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.config.web.server.ServerHttpSecurity$ServerWebExchangeReactorContextWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.WebFilterChainProxy [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ HTTP POST \"/login\" [ExceptionHandlingWebHandler]\nStack trace:\n\t\tat org.springframework.web.reactive.result.method.annotation.AbstractMessageReaderArgumentResolver.handleMissingBody(AbstractMessageReaderArgumentResolver.java:230)\n\t\tat org.springframework.web.reactive.result.method.annotation.AbstractMessageReaderArgumentResolver.lambda$readBody(AbstractMessageReaderArgumentResolver.java:194)\n\t\tat reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:70)\n\t\tat reactor.core.publisher.Mono.subscribe(Mono.java:4219)\n\t\tat reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)\n\t\tat reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2016)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onComplete(MonoFlatMap.java:174)\n\t\tat reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onComplete(FluxContextStart.java:115)\n\t\tat reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onComplete(FluxMapFuseable.java:336)\n\t\tat reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onComplete(FluxFilterFuseable.java:384)\n\t\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1783)\n\t\tat reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:152)\n\t\tat reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)\n\t\tat reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)\n\t\tat reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)\n\t\tat reactor.core.publisher.Operators.complete(Operators.java:135)\n\t\tat reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:294)\n\t\tat reactor.netty.channel.FluxReceive.lambda$subscribe(FluxReceive.java:138)\n\t\tat io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)\n\t\tat io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)\n\t\tat io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)\n\t\tat io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989)\n\t\tat io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)\n\t\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\t\tat java.base/java.lang.Thread.run(Thread.java:832)\n"
}
这是我使用 zip
得出的结果。为什么你认为在tuple2
结构中有一个Mono
是不对的?
public Mono<TokenDto> generateToken(Mono<UserDto> user) {
return Mono.zip(user,user.flatMap(a -> userRepository.findByEmail(a.getEmail())))
.filter(tuple2 -> bCryptPasswordEncoder.matches(tuple2.getT1().getPassword(), tuple2.getT2().getPassword()))
.map(tuple2 -> tuple2.getT2())
.map(foundUser -> {//your mapping logic})
.map(TokenDto::new);
}
可以这样做,不会订阅用户两次。
public Mono<TokenDto> generateToken(Mono<UserDto> user) {
return user
.flatMap(userDto ->
userRepository
.findByEmail(userDto.getEmail())
//filter is inside flatmap
.filter(foundUser -> bCryptPasswordEncoder.matches(userDto.getPassword(), foundUser.getPassword()))
)
.map(foundUser -> JWT.create()
.withSubject(foundUser.getEmail())
.withExpiresAt(new Date(System.currentTimeMillis() + jwtTtl.toMillis()))
.withClaim(USER_ROLES_CLAIM, foundUser.getRoles().stream().map(RoleEnum::name).collect(Collectors.toList()))
.sign(Algorithm.HMAC512(jwtSecret))
)
.map(TokenDto::new);
}
首先,我很困惑使用反应式处理程序方法参数是否有任何好处。 其次,当我需要多次读取这些参数时,我在使用这种技术时遇到了一些问题。
public Mono<TokenDto> generateToken(Mono<UserDto> user) {
var userDto = user.block();
return userRepository.findByEmail(userDto.getEmail())
.filter(foundUser -> bCryptPasswordEncoder.matches(userDto.getPassword(), foundUser.getPassword()))
.map(foundUser -> JWT.create()
.withSubject(foundUser.getEmail())
.withExpiresAt(new Date(System.currentTimeMillis() + jwtTtl.toMillis()))
.withClaim(USER_ROLES_CLAIM, foundUser.getRoles().stream().map(RoleEnum::name).collect(Collectors.toList()))
.sign(Algorithm.HMAC512(jwtSecret))
)
.map(TokenDto::new);
}
如您所见,我需要阻塞才能实际读取参数两次。我尝试使用 userRepository 结果压缩,但最终我在 Tuple2 结构中得到了 Mono。
Reactor对此有什么解决方案吗?也许有一个函数 publish/repeat 可能是有前途的。我成功地建立了一个管道,但是多次读取请求体总是出错。
谢谢。
错误:
"timestamp": "2020-07-03T06:24:23.592+00:00",
"path": "/login",
"status": 400,
"error": "Bad Request",
"message": "Request body is missing: public reactor.core.publisher.Mono<org.springframework.http.ResponseEntity<com.ostrozlik.taskagent.web.dto.TokenDto>> com.ostrozlik.taskagent.web.controller.UserController.login(reactor.core.publisher.Mono<com.ostrozlik.taskagent.web.dto.UserDto>)",
"requestId": "defc8953-1",
"trace": "org.springframework.web.server.ServerWebInputException: 400 BAD_REQUEST \"Request body is missing: public reactor.core.publisher.Mono<org.springframework.http.ResponseEntity<com.ostrozlik.taskagent.web.dto.TokenDto>> com.ostrozlik.taskagent.web.controller.UserController.login(reactor.core.publisher.Mono<com.ostrozlik.taskagent.web.dto.UserDto>)\"\n\tat org.springframework.web.reactive.result.method.annotation.AbstractMessageReaderArgumentResolver.handleMissingBody(AbstractMessageReaderArgumentResolver.java:230)\n\tSuppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: \nError has been observed at the following site(s):\n\t|_ checkpoint ⇢ Handler com.ostrozlik.taskagent.web.controller.UserController#login(Mono) [DispatcherHandler]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.authorization.AuthorizationWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.authorization.ExceptionTranslationWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.authentication.logout.LogoutWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.savedrequest.ServerRequestCacheWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.context.SecurityContextServerWebExchangeWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.context.ReactorContextWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.header.HttpHeaderWriterWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.config.web.server.ServerHttpSecurity$ServerWebExchangeReactorContextWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.WebFilterChainProxy [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ HTTP POST \"/login\" [ExceptionHandlingWebHandler]\nStack trace:\n\t\tat org.springframework.web.reactive.result.method.annotation.AbstractMessageReaderArgumentResolver.handleMissingBody(AbstractMessageReaderArgumentResolver.java:230)\n\t\tat org.springframework.web.reactive.result.method.annotation.AbstractMessageReaderArgumentResolver.lambda$readBody(AbstractMessageReaderArgumentResolver.java:194)\n\t\tat reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:70)\n\t\tat reactor.core.publisher.Mono.subscribe(Mono.java:4219)\n\t\tat reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)\n\t\tat reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2016)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onComplete(MonoFlatMap.java:174)\n\t\tat reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onComplete(FluxContextStart.java:115)\n\t\tat reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onComplete(FluxMapFuseable.java:336)\n\t\tat reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onComplete(FluxFilterFuseable.java:384)\n\t\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1783)\n\t\tat reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:152)\n\t\tat reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)\n\t\tat reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)\n\t\tat reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)\n\t\tat reactor.core.publisher.Operators.complete(Operators.java:135)\n\t\tat reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:294)\n\t\tat reactor.netty.channel.FluxReceive.lambda$subscribe(FluxReceive.java:138)\n\t\tat io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)\n\t\tat io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)\n\t\tat io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)\n\t\tat io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989)\n\t\tat io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)\n\t\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\t\tat java.base/java.lang.Thread.run(Thread.java:832)\n"
}
这是我使用 zip
得出的结果。为什么你认为在tuple2
结构中有一个Mono
是不对的?
public Mono<TokenDto> generateToken(Mono<UserDto> user) {
return Mono.zip(user,user.flatMap(a -> userRepository.findByEmail(a.getEmail())))
.filter(tuple2 -> bCryptPasswordEncoder.matches(tuple2.getT1().getPassword(), tuple2.getT2().getPassword()))
.map(tuple2 -> tuple2.getT2())
.map(foundUser -> {//your mapping logic})
.map(TokenDto::new);
}
可以这样做,不会订阅用户两次。
public Mono<TokenDto> generateToken(Mono<UserDto> user) {
return user
.flatMap(userDto ->
userRepository
.findByEmail(userDto.getEmail())
//filter is inside flatmap
.filter(foundUser -> bCryptPasswordEncoder.matches(userDto.getPassword(), foundUser.getPassword()))
)
.map(foundUser -> JWT.create()
.withSubject(foundUser.getEmail())
.withExpiresAt(new Date(System.currentTimeMillis() + jwtTtl.toMillis()))
.withClaim(USER_ROLES_CLAIM, foundUser.getRoles().stream().map(RoleEnum::name).collect(Collectors.toList()))
.sign(Algorithm.HMAC512(jwtSecret))
)
.map(TokenDto::new);
}