Spring Webflux:从 Mono 中提取一个值并将其保存到另一个变量中

Spring Webflux: Extract a value from a Mono and save it into another variable

API中使用的堆栈是:Spring-boot,SpringWebflux,Hibernate Reactive 1.1.3 和 Postgresql。

我想从名为 desiredField 的 Mono 中提取一个字符串值,并将其添加到哈希图中。我正在展示发生这一切的数据库、模型、存储库和 util 模块。

JWTUtil 用于在 API 中生成用于登录目的的 JWT 令牌。 JWTController 工作正常,返回令牌作为响应!唯一的问题是无法从 Mono 中提取字符串值。

我试过两种方法。反应方式,使用订阅方法。而拦截方式使用的是block方法。下面显示了这两个方法及其自己的控制台打印。

在第一种方法(反应方式)中我无法从 Mono 中提取 desiredField。令牌出现在响应中,但令牌声明中没有 desiredField。 desiredField 仅在 de stream (查看控制台日志打印) 内部有效,外部为空。

第二种方法(阻塞方式)没有得到响应中的token就出现无限循环错误

你能帮我从 Mono 中提取这个值吗?

提前致谢!

Postgres:

############
# my_table #
############

id  |  desired_column
-------------------------------
 1  |  "I am the desired value"

型号:

@Data
@Entity(name = "MyModel")
@Table(name = "my_table")
public class MyModel{
    @Id
    @GeneratedValue
    private long id;

    @Column(name="desired_column")
    private String desiredField;
}

存储库:

@Slf4j
@Repository
public class MyModelRepository {

    public Mono<MyModel> getMyModelInstance() {
        try{
            EntityManagerFactory emf = Persistence.createEntityManagerFactory("my-persistance");
            Mutiny.SessionFactory factory = emf.unwrap(Mutiny.SessionFactory.class);

            CriteriaBuilder builder = factory.getCriteriaBuilder();
            CriteriaQuery<MyModel> criteria = builder.createQuery(MyModel.class);
            Root<MyModel> root = criteria.from(MyModel.class);
            criteria.where(builder.equal(root.get("id"), "1"));

            Uni<MyModel> myModelInstanceUni= factory.withSession(
                    session -> session
                            .createQuery(criteria)
                            .getSingleResult()
            );

            return myModelInstanceUni
                    .convert()
                    .with(UniReactorConverters.toMono());

        } catch(Exception ex) {
            log.error("There is an error: {}", ex.getMessage());
            return null;
        }
    }
}

方式 1:反应方式

实用程序:


@Slf4j
@Component
public class JWTUtil {

    @Autowired
    private MyModelRepository myModelRepository;

    @Value("${springbootwebfluxjjwt.jjwt.secret}")
    private String secret;

    @Value("${springbootwebfluxjjwt.jjwt.expiration}")
    private String expirationTime;

    private Key key;


    @PostConstruct
    public void init(){
        this.key = Keys.hmacShaKeyFor(secret.getBytes());
    }


    public Claims getAllClaimsFromToken(String token) {
        // Here get token claims
    }


    public String getUsernameFromToken(String token) {
        // Here get token username
    }


    public Date getExpirationDateFromToken(String token) {
        // Here get expiration date from token;
    }


    private Boolean isTokenExpired(String token) {
        // Here validates token
    }


    public String generateToken (User user) {

        ArrayList<Long> usr_id = new ArrayList<Long>(Collections.singleton(user.getId()));
        ArrayList<String> usr_name = new ArrayList<String>(Collections.singleton(user.getName()));
        ArrayList<String> usr_lastname = new ArrayList<>(Collections.singleton(user.getLastName()));

        Map<String, Object> claims = new HashMap<>();
        claims.put("usr_id", usu_id);
        claims.put("usr_name", usu_name);
        claims.put("usr_lastname", usu_lastname);

        String[] desiredValueFromInside= {null};

        Disposable myInstance= myModelRepository.getMyModelInstance()
                .log()
                .subscribeOn(Schedulers.parallel())
                .subscribe(
                    // On next
                    instance-> {
                        desiredValueFromInside[0] = instance.getDesiredField();
                        claims.put("desiredField", instance.getDesiredField());

                        log.info("SUBSCRIBING, instance.getDesiredField() IS: {}", impuesto.getImp_tasa());
                        log.info("SUBSCRIBING, THE VAR desiredValueFromInsideIS: {}", impuestoFromInside[0]);
                    },
                    // On error
                    error -> log.error("THERE S A PROBLEM: {}", error),
                    // On complete
                    () -> log.info("WE ARE DONE!")
                );

        log.info("DEBUGGING, FROM OUTSIDE desiredValueFromInside IS: {}", desiredValueFromInside[0]);
        log.info("DEBUGGING, FROM OUTSIDE claims.get(\"desiredField\") IS: {}", claims.get("desiredField"));

        return doGenerateToken(claims, user.getUsername());
    }


    private String doGenerateToken(Map<String, Object> claims, String username) {
        // Here generate token
    }


    public Boolean validateToken(String token) {
        // Here validates token
    }
}

控制台:

.
.
.
2022-03-02 11:21:02.485  INFO 44752 --- [ntloop-thread-0] .r.p.i.DefaultSqlClientPoolConfiguration : HR000025: Connection pool size: 100
2022-03-02 11:21:02.500  INFO 44752 --- [ntloop-thread-0] c.a.o.configuration.security.JWTUtil     : DEBUGGING, FROM OUTSIDE desiredValueFromInside IS: null
2022-03-02 11:21:02.500  INFO 44752 --- [ntloop-thread-0] c.a.o.configuration.security.JWTUtil     : DEBUGGING, FROM OUTSIDE claims.get(\"desiredField\") IS: null
2022-03-02 11:21:02.500  INFO 44752 --- [     parallel-2] reactor.Mono.FromPublisher.1             : onSubscribe(MonoNext.NextSubscriber)
2022-03-02 11:21:02.507  INFO 44752 --- [     parallel-2] reactor.Mono.FromPublisher.1             : request(unbounded)
.
.
.
2022-03-02 11:21:03.834  INFO 44752 --- [ntloop-thread-1] reactor.Mono.FromPublisher.1             : onNext(MyModel(id=1, desired_field="I am the desired value")
2022-03-02 11:21:03.849  INFO 44752 --- [ntloop-thread-1] c.a.o.configuration.security.JWTUtil     : SUBSCRIBING, instance.getDesiredField() IS: "I am the desired value"
2022-03-02 11:21:03.849  INFO 44752 --- [ntloop-thread-1] c.a.o.configuration.security.JWTUtil     : SUBSCRIBING, THE VAR desiredValueFromInsideIS: "I am the desired value"
2022-03-02 11:21:03.849  INFO 44752 --- [ntloop-thread-1] c.a.o.configuration.security.JWTUtil     : "WE ARE DONE!"
2022-03-02 11:21:03.849  INFO 44752 --- [ntloop-thread-1] reactor.Mono.FromPublisher.1             : onComplete()

方式 2:阻塞方式

实用程序:


@Slf4j
@Component
public class JWTUtil {

    @Autowired
    private MyModelRepository myModelRepository;

    @Value("${springbootwebfluxjjwt.jjwt.secret}")
    private String secret;

    @Value("${springbootwebfluxjjwt.jjwt.expiration}")
    private String expirationTime;

    private Key key;


    @PostConstruct
    public void init(){
        this.key = Keys.hmacShaKeyFor(secret.getBytes());
    }


    public Claims getAllClaimsFromToken(String token) {
        // Here get token claims
    }


    public String getUsernameFromToken(String token) {
        // Here get token username
    }


    public Date getExpirationDateFromToken(String token) {
        // Here get expiration date from token;
    }


    private Boolean isTokenExpired(String token) {
        // Here validates token
    }


    public String generateToken (User user) {

        ArrayList<Long> usr_id = new ArrayList<Long>(Collections.singleton(user.getId()));
        ArrayList<String> usr_name = new ArrayList<String>(Collections.singleton(user.getName()));
        ArrayList<String> usr_lastname = new ArrayList<>(Collections.singleton(user.getLastName()));

        Map<String, Object> claims = new HashMap<>();
        claims.put("usr_id", usu_id);
        claims.put("usr_name", usu_name);
        claims.put("usr_lastname", usu_lastname);

        String[] desiredValueFromInside= {null};

        String desiredField = myModelRepository.getMyModelInstance()
                .map(MyModel::getDesiredField)
                .share()
                .block();

        desiredValueFromInside[0] = instance.getDesiredField();
        claims.put("desiredField", instance.getDesiredField());

        log.info("DEBUGGING, FROM OUTSIDE desiredValueFromInside IS: {}", desiredValueFromInside[0]);
        log.info("DEBUGGING, FROM OUTSIDE claims.get(\"desiredField\") IS: {}", claims.get("desiredField"));

        return doGenerateToken(claims, user.getUsername());
    }


    private String doGenerateToken(Map<String, Object> claims, String username) {
        // Here generate token
    }


    public Boolean validateToken(String token) {
        // Here validates token
    }
}

控制台:此控制台日志存在无限循环。

.
.
.
2022-03-02 11:40:58.640  WARN 38436 --- [-thread-checker] io.vertx.core.impl.BlockedThreadChecker  : Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 9212 ms, time limit is 2000 ms

io.vertx.core.VertxException: Thread blocked
    at java.base@11.0.12/java.lang.Thread.sleep(Native Method) ~[na:na]
    at app//reactor.core.publisher.NextProcessor.block(NextProcessor.java:135) ~[reactor-core-3.4.14.jar:3.4.14]
    at app//reactor.core.publisher.MonoProcessor.block(MonoProcessor.java:116) ~[reactor-core-3.4.14.jar:3.4.14]
.
.
.

我认为这里的要点是你组织代码的方式,在我看来你使用的是反应式框架,但以命令式的方式思考和编码。

如果您将 generateTokendoGenerateToken 更改为 return 单声道,您可以执行如下操作:

    private Mono<String> generateToken(User user) {
        ArrayList<Long> usr_id = new ArrayList<Long>(Collections.singleton(user.getId()));
        ArrayList<String> usr_name = new ArrayList<String>(Collections.singleton(user.getName()));
        ArrayList<String> usr_lastname = new ArrayList<>(Collections.singleton(user.getLastName()));

        return myModelRepository.getMyModelInstance()
                .map(modelInstance -> {
                    Map<String, Object> claims = new HashMap<>();

                    //put all values you need in claims map...

                    //update doGenerateToken code to return a Mono<String>
                    return doGenerateToken(claims, user.getUsername());
                });
    }