Spring Webflux:从 Mono 中提取一个值并将其保存到另一个变量中
Spring Webflux: Extract a value from a Mono and save it into another variable
API中使用的堆栈是:Spring-boot,SpringWebflux,Hibernate Reactive 1.1.3 和 Postgresql。
我想从名为 desiredField 的 Mono 中提取一个字符串值,并将其添加到哈希图中。我正在展示发生这一切的数据库、模型、存储库和 util 模块。
JWTUtil 用于在 API 中生成用于登录目的的 JWT 令牌。 JWTController 工作正常,返回令牌作为响应!唯一的问题是无法从 Mono 中提取字符串值。
我试过两种方法。反应方式,使用订阅方法。而拦截方式使用的是block方法。下面显示了这两个方法及其自己的控制台打印。
在第一种方法(反应方式)中我无法从 Mono 中提取 desiredField。令牌出现在响应中,但令牌声明中没有 desiredField。 desiredField 仅在 de stream (查看控制台日志打印) 内部有效,外部为空。
第二种方法(阻塞方式)没有得到响应中的token就出现无限循环错误
你能帮我从 Mono 中提取这个值吗?
提前致谢!
Postgres:
############
# my_table #
############
id | desired_column
-------------------------------
1 | "I am the desired value"
型号:
@Data
@Entity(name = "MyModel")
@Table(name = "my_table")
public class MyModel{
@Id
@GeneratedValue
private long id;
@Column(name="desired_column")
private String desiredField;
}
存储库:
@Slf4j
@Repository
public class MyModelRepository {
public Mono<MyModel> getMyModelInstance() {
try{
EntityManagerFactory emf = Persistence.createEntityManagerFactory("my-persistance");
Mutiny.SessionFactory factory = emf.unwrap(Mutiny.SessionFactory.class);
CriteriaBuilder builder = factory.getCriteriaBuilder();
CriteriaQuery<MyModel> criteria = builder.createQuery(MyModel.class);
Root<MyModel> root = criteria.from(MyModel.class);
criteria.where(builder.equal(root.get("id"), "1"));
Uni<MyModel> myModelInstanceUni= factory.withSession(
session -> session
.createQuery(criteria)
.getSingleResult()
);
return myModelInstanceUni
.convert()
.with(UniReactorConverters.toMono());
} catch(Exception ex) {
log.error("There is an error: {}", ex.getMessage());
return null;
}
}
}
方式 1:反应方式
实用程序:
@Slf4j
@Component
public class JWTUtil {
@Autowired
private MyModelRepository myModelRepository;
@Value("${springbootwebfluxjjwt.jjwt.secret}")
private String secret;
@Value("${springbootwebfluxjjwt.jjwt.expiration}")
private String expirationTime;
private Key key;
@PostConstruct
public void init(){
this.key = Keys.hmacShaKeyFor(secret.getBytes());
}
public Claims getAllClaimsFromToken(String token) {
// Here get token claims
}
public String getUsernameFromToken(String token) {
// Here get token username
}
public Date getExpirationDateFromToken(String token) {
// Here get expiration date from token;
}
private Boolean isTokenExpired(String token) {
// Here validates token
}
public String generateToken (User user) {
ArrayList<Long> usr_id = new ArrayList<Long>(Collections.singleton(user.getId()));
ArrayList<String> usr_name = new ArrayList<String>(Collections.singleton(user.getName()));
ArrayList<String> usr_lastname = new ArrayList<>(Collections.singleton(user.getLastName()));
Map<String, Object> claims = new HashMap<>();
claims.put("usr_id", usu_id);
claims.put("usr_name", usu_name);
claims.put("usr_lastname", usu_lastname);
String[] desiredValueFromInside= {null};
Disposable myInstance= myModelRepository.getMyModelInstance()
.log()
.subscribeOn(Schedulers.parallel())
.subscribe(
// On next
instance-> {
desiredValueFromInside[0] = instance.getDesiredField();
claims.put("desiredField", instance.getDesiredField());
log.info("SUBSCRIBING, instance.getDesiredField() IS: {}", impuesto.getImp_tasa());
log.info("SUBSCRIBING, THE VAR desiredValueFromInsideIS: {}", impuestoFromInside[0]);
},
// On error
error -> log.error("THERE S A PROBLEM: {}", error),
// On complete
() -> log.info("WE ARE DONE!")
);
log.info("DEBUGGING, FROM OUTSIDE desiredValueFromInside IS: {}", desiredValueFromInside[0]);
log.info("DEBUGGING, FROM OUTSIDE claims.get(\"desiredField\") IS: {}", claims.get("desiredField"));
return doGenerateToken(claims, user.getUsername());
}
private String doGenerateToken(Map<String, Object> claims, String username) {
// Here generate token
}
public Boolean validateToken(String token) {
// Here validates token
}
}
控制台:
.
.
.
2022-03-02 11:21:02.485 INFO 44752 --- [ntloop-thread-0] .r.p.i.DefaultSqlClientPoolConfiguration : HR000025: Connection pool size: 100
2022-03-02 11:21:02.500 INFO 44752 --- [ntloop-thread-0] c.a.o.configuration.security.JWTUtil : DEBUGGING, FROM OUTSIDE desiredValueFromInside IS: null
2022-03-02 11:21:02.500 INFO 44752 --- [ntloop-thread-0] c.a.o.configuration.security.JWTUtil : DEBUGGING, FROM OUTSIDE claims.get(\"desiredField\") IS: null
2022-03-02 11:21:02.500 INFO 44752 --- [ parallel-2] reactor.Mono.FromPublisher.1 : onSubscribe(MonoNext.NextSubscriber)
2022-03-02 11:21:02.507 INFO 44752 --- [ parallel-2] reactor.Mono.FromPublisher.1 : request(unbounded)
.
.
.
2022-03-02 11:21:03.834 INFO 44752 --- [ntloop-thread-1] reactor.Mono.FromPublisher.1 : onNext(MyModel(id=1, desired_field="I am the desired value")
2022-03-02 11:21:03.849 INFO 44752 --- [ntloop-thread-1] c.a.o.configuration.security.JWTUtil : SUBSCRIBING, instance.getDesiredField() IS: "I am the desired value"
2022-03-02 11:21:03.849 INFO 44752 --- [ntloop-thread-1] c.a.o.configuration.security.JWTUtil : SUBSCRIBING, THE VAR desiredValueFromInsideIS: "I am the desired value"
2022-03-02 11:21:03.849 INFO 44752 --- [ntloop-thread-1] c.a.o.configuration.security.JWTUtil : "WE ARE DONE!"
2022-03-02 11:21:03.849 INFO 44752 --- [ntloop-thread-1] reactor.Mono.FromPublisher.1 : onComplete()
方式 2:阻塞方式
实用程序:
@Slf4j
@Component
public class JWTUtil {
@Autowired
private MyModelRepository myModelRepository;
@Value("${springbootwebfluxjjwt.jjwt.secret}")
private String secret;
@Value("${springbootwebfluxjjwt.jjwt.expiration}")
private String expirationTime;
private Key key;
@PostConstruct
public void init(){
this.key = Keys.hmacShaKeyFor(secret.getBytes());
}
public Claims getAllClaimsFromToken(String token) {
// Here get token claims
}
public String getUsernameFromToken(String token) {
// Here get token username
}
public Date getExpirationDateFromToken(String token) {
// Here get expiration date from token;
}
private Boolean isTokenExpired(String token) {
// Here validates token
}
public String generateToken (User user) {
ArrayList<Long> usr_id = new ArrayList<Long>(Collections.singleton(user.getId()));
ArrayList<String> usr_name = new ArrayList<String>(Collections.singleton(user.getName()));
ArrayList<String> usr_lastname = new ArrayList<>(Collections.singleton(user.getLastName()));
Map<String, Object> claims = new HashMap<>();
claims.put("usr_id", usu_id);
claims.put("usr_name", usu_name);
claims.put("usr_lastname", usu_lastname);
String[] desiredValueFromInside= {null};
String desiredField = myModelRepository.getMyModelInstance()
.map(MyModel::getDesiredField)
.share()
.block();
desiredValueFromInside[0] = instance.getDesiredField();
claims.put("desiredField", instance.getDesiredField());
log.info("DEBUGGING, FROM OUTSIDE desiredValueFromInside IS: {}", desiredValueFromInside[0]);
log.info("DEBUGGING, FROM OUTSIDE claims.get(\"desiredField\") IS: {}", claims.get("desiredField"));
return doGenerateToken(claims, user.getUsername());
}
private String doGenerateToken(Map<String, Object> claims, String username) {
// Here generate token
}
public Boolean validateToken(String token) {
// Here validates token
}
}
控制台:此控制台日志存在无限循环。
.
.
.
2022-03-02 11:40:58.640 WARN 38436 --- [-thread-checker] io.vertx.core.impl.BlockedThreadChecker : Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 9212 ms, time limit is 2000 ms
io.vertx.core.VertxException: Thread blocked
at java.base@11.0.12/java.lang.Thread.sleep(Native Method) ~[na:na]
at app//reactor.core.publisher.NextProcessor.block(NextProcessor.java:135) ~[reactor-core-3.4.14.jar:3.4.14]
at app//reactor.core.publisher.MonoProcessor.block(MonoProcessor.java:116) ~[reactor-core-3.4.14.jar:3.4.14]
.
.
.
我认为这里的要点是你组织代码的方式,在我看来你使用的是反应式框架,但以命令式的方式思考和编码。
如果您将 generateToken
和 doGenerateToken
更改为 return 单声道,您可以执行如下操作:
private Mono<String> generateToken(User user) {
ArrayList<Long> usr_id = new ArrayList<Long>(Collections.singleton(user.getId()));
ArrayList<String> usr_name = new ArrayList<String>(Collections.singleton(user.getName()));
ArrayList<String> usr_lastname = new ArrayList<>(Collections.singleton(user.getLastName()));
return myModelRepository.getMyModelInstance()
.map(modelInstance -> {
Map<String, Object> claims = new HashMap<>();
//put all values you need in claims map...
//update doGenerateToken code to return a Mono<String>
return doGenerateToken(claims, user.getUsername());
});
}
API中使用的堆栈是:Spring-boot,SpringWebflux,Hibernate Reactive 1.1.3 和 Postgresql。
我想从名为 desiredField 的 Mono 中提取一个字符串值,并将其添加到哈希图中。我正在展示发生这一切的数据库、模型、存储库和 util 模块。
JWTUtil 用于在 API 中生成用于登录目的的 JWT 令牌。 JWTController 工作正常,返回令牌作为响应!唯一的问题是无法从 Mono 中提取字符串值。
我试过两种方法。反应方式,使用订阅方法。而拦截方式使用的是block方法。下面显示了这两个方法及其自己的控制台打印。
在第一种方法(反应方式)中我无法从 Mono 中提取 desiredField。令牌出现在响应中,但令牌声明中没有 desiredField。 desiredField 仅在 de stream (查看控制台日志打印) 内部有效,外部为空。
第二种方法(阻塞方式)没有得到响应中的token就出现无限循环错误
你能帮我从 Mono 中提取这个值吗?
提前致谢!
Postgres:
############
# my_table #
############
id | desired_column
-------------------------------
1 | "I am the desired value"
型号:
@Data
@Entity(name = "MyModel")
@Table(name = "my_table")
public class MyModel{
@Id
@GeneratedValue
private long id;
@Column(name="desired_column")
private String desiredField;
}
存储库:
@Slf4j
@Repository
public class MyModelRepository {
public Mono<MyModel> getMyModelInstance() {
try{
EntityManagerFactory emf = Persistence.createEntityManagerFactory("my-persistance");
Mutiny.SessionFactory factory = emf.unwrap(Mutiny.SessionFactory.class);
CriteriaBuilder builder = factory.getCriteriaBuilder();
CriteriaQuery<MyModel> criteria = builder.createQuery(MyModel.class);
Root<MyModel> root = criteria.from(MyModel.class);
criteria.where(builder.equal(root.get("id"), "1"));
Uni<MyModel> myModelInstanceUni= factory.withSession(
session -> session
.createQuery(criteria)
.getSingleResult()
);
return myModelInstanceUni
.convert()
.with(UniReactorConverters.toMono());
} catch(Exception ex) {
log.error("There is an error: {}", ex.getMessage());
return null;
}
}
}
方式 1:反应方式
实用程序:
@Slf4j
@Component
public class JWTUtil {
@Autowired
private MyModelRepository myModelRepository;
@Value("${springbootwebfluxjjwt.jjwt.secret}")
private String secret;
@Value("${springbootwebfluxjjwt.jjwt.expiration}")
private String expirationTime;
private Key key;
@PostConstruct
public void init(){
this.key = Keys.hmacShaKeyFor(secret.getBytes());
}
public Claims getAllClaimsFromToken(String token) {
// Here get token claims
}
public String getUsernameFromToken(String token) {
// Here get token username
}
public Date getExpirationDateFromToken(String token) {
// Here get expiration date from token;
}
private Boolean isTokenExpired(String token) {
// Here validates token
}
public String generateToken (User user) {
ArrayList<Long> usr_id = new ArrayList<Long>(Collections.singleton(user.getId()));
ArrayList<String> usr_name = new ArrayList<String>(Collections.singleton(user.getName()));
ArrayList<String> usr_lastname = new ArrayList<>(Collections.singleton(user.getLastName()));
Map<String, Object> claims = new HashMap<>();
claims.put("usr_id", usu_id);
claims.put("usr_name", usu_name);
claims.put("usr_lastname", usu_lastname);
String[] desiredValueFromInside= {null};
Disposable myInstance= myModelRepository.getMyModelInstance()
.log()
.subscribeOn(Schedulers.parallel())
.subscribe(
// On next
instance-> {
desiredValueFromInside[0] = instance.getDesiredField();
claims.put("desiredField", instance.getDesiredField());
log.info("SUBSCRIBING, instance.getDesiredField() IS: {}", impuesto.getImp_tasa());
log.info("SUBSCRIBING, THE VAR desiredValueFromInsideIS: {}", impuestoFromInside[0]);
},
// On error
error -> log.error("THERE S A PROBLEM: {}", error),
// On complete
() -> log.info("WE ARE DONE!")
);
log.info("DEBUGGING, FROM OUTSIDE desiredValueFromInside IS: {}", desiredValueFromInside[0]);
log.info("DEBUGGING, FROM OUTSIDE claims.get(\"desiredField\") IS: {}", claims.get("desiredField"));
return doGenerateToken(claims, user.getUsername());
}
private String doGenerateToken(Map<String, Object> claims, String username) {
// Here generate token
}
public Boolean validateToken(String token) {
// Here validates token
}
}
控制台:
.
.
.
2022-03-02 11:21:02.485 INFO 44752 --- [ntloop-thread-0] .r.p.i.DefaultSqlClientPoolConfiguration : HR000025: Connection pool size: 100
2022-03-02 11:21:02.500 INFO 44752 --- [ntloop-thread-0] c.a.o.configuration.security.JWTUtil : DEBUGGING, FROM OUTSIDE desiredValueFromInside IS: null
2022-03-02 11:21:02.500 INFO 44752 --- [ntloop-thread-0] c.a.o.configuration.security.JWTUtil : DEBUGGING, FROM OUTSIDE claims.get(\"desiredField\") IS: null
2022-03-02 11:21:02.500 INFO 44752 --- [ parallel-2] reactor.Mono.FromPublisher.1 : onSubscribe(MonoNext.NextSubscriber)
2022-03-02 11:21:02.507 INFO 44752 --- [ parallel-2] reactor.Mono.FromPublisher.1 : request(unbounded)
.
.
.
2022-03-02 11:21:03.834 INFO 44752 --- [ntloop-thread-1] reactor.Mono.FromPublisher.1 : onNext(MyModel(id=1, desired_field="I am the desired value")
2022-03-02 11:21:03.849 INFO 44752 --- [ntloop-thread-1] c.a.o.configuration.security.JWTUtil : SUBSCRIBING, instance.getDesiredField() IS: "I am the desired value"
2022-03-02 11:21:03.849 INFO 44752 --- [ntloop-thread-1] c.a.o.configuration.security.JWTUtil : SUBSCRIBING, THE VAR desiredValueFromInsideIS: "I am the desired value"
2022-03-02 11:21:03.849 INFO 44752 --- [ntloop-thread-1] c.a.o.configuration.security.JWTUtil : "WE ARE DONE!"
2022-03-02 11:21:03.849 INFO 44752 --- [ntloop-thread-1] reactor.Mono.FromPublisher.1 : onComplete()
方式 2:阻塞方式
实用程序:
@Slf4j
@Component
public class JWTUtil {
@Autowired
private MyModelRepository myModelRepository;
@Value("${springbootwebfluxjjwt.jjwt.secret}")
private String secret;
@Value("${springbootwebfluxjjwt.jjwt.expiration}")
private String expirationTime;
private Key key;
@PostConstruct
public void init(){
this.key = Keys.hmacShaKeyFor(secret.getBytes());
}
public Claims getAllClaimsFromToken(String token) {
// Here get token claims
}
public String getUsernameFromToken(String token) {
// Here get token username
}
public Date getExpirationDateFromToken(String token) {
// Here get expiration date from token;
}
private Boolean isTokenExpired(String token) {
// Here validates token
}
public String generateToken (User user) {
ArrayList<Long> usr_id = new ArrayList<Long>(Collections.singleton(user.getId()));
ArrayList<String> usr_name = new ArrayList<String>(Collections.singleton(user.getName()));
ArrayList<String> usr_lastname = new ArrayList<>(Collections.singleton(user.getLastName()));
Map<String, Object> claims = new HashMap<>();
claims.put("usr_id", usu_id);
claims.put("usr_name", usu_name);
claims.put("usr_lastname", usu_lastname);
String[] desiredValueFromInside= {null};
String desiredField = myModelRepository.getMyModelInstance()
.map(MyModel::getDesiredField)
.share()
.block();
desiredValueFromInside[0] = instance.getDesiredField();
claims.put("desiredField", instance.getDesiredField());
log.info("DEBUGGING, FROM OUTSIDE desiredValueFromInside IS: {}", desiredValueFromInside[0]);
log.info("DEBUGGING, FROM OUTSIDE claims.get(\"desiredField\") IS: {}", claims.get("desiredField"));
return doGenerateToken(claims, user.getUsername());
}
private String doGenerateToken(Map<String, Object> claims, String username) {
// Here generate token
}
public Boolean validateToken(String token) {
// Here validates token
}
}
控制台:此控制台日志存在无限循环。
.
.
.
2022-03-02 11:40:58.640 WARN 38436 --- [-thread-checker] io.vertx.core.impl.BlockedThreadChecker : Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 9212 ms, time limit is 2000 ms
io.vertx.core.VertxException: Thread blocked
at java.base@11.0.12/java.lang.Thread.sleep(Native Method) ~[na:na]
at app//reactor.core.publisher.NextProcessor.block(NextProcessor.java:135) ~[reactor-core-3.4.14.jar:3.4.14]
at app//reactor.core.publisher.MonoProcessor.block(MonoProcessor.java:116) ~[reactor-core-3.4.14.jar:3.4.14]
.
.
.
我认为这里的要点是你组织代码的方式,在我看来你使用的是反应式框架,但以命令式的方式思考和编码。
如果您将 generateToken
和 doGenerateToken
更改为 return 单声道,您可以执行如下操作:
private Mono<String> generateToken(User user) {
ArrayList<Long> usr_id = new ArrayList<Long>(Collections.singleton(user.getId()));
ArrayList<String> usr_name = new ArrayList<String>(Collections.singleton(user.getName()));
ArrayList<String> usr_lastname = new ArrayList<>(Collections.singleton(user.getLastName()));
return myModelRepository.getMyModelInstance()
.map(modelInstance -> {
Map<String, Object> claims = new HashMap<>();
//put all values you need in claims map...
//update doGenerateToken code to return a Mono<String>
return doGenerateToken(claims, user.getUsername());
});
}