Spring webflux 某些方法在没有订阅或阻止的情况下无法工作

Spring webflux some methods not work without subscribe or block

响应式编程和 Spring Webflux 的新手,我有一种方法可以从 Redis 获取值并在特定条件下使密钥过期。但是代码过期键总是不起作用。

我目前的实现:

private Mono<MyObject> getMyObjectFromCache(String url) {
    RMapReactive<String, String> rMap = redissonReactiveClient.getMap(url);
    return rMap.readAllMap()
            .flatMap(m ->
                    rMap.remainTimeToLive()
                            .flatMap(ttl -> {
                                        final long renewalThreshold = 60 * 60 * 1000;
                                        if (ttl <= renewalThreshold) {
                                            System.out.println("start expiring");
                                            // it doesn't work without subscribe()
                                            rMap.expire(2, TimeUnit.HOURS);
                                        }
                                        return Mono.just(JSONObject.parseObject(JSON.toJSONString(m), MyObject.class));
                                    }
                            )
            );
}

过期方法returnsMono<Boolean>

    public Mono<MyObject> getMyObjInfo(String url) {
        // something else
        return getMyObjectFromFromCache(url).switchIfEmpty(Mono.defer(() -> getMyObjectFromRemoteService(url)));
    }

自定义网关过滤器

            @Override
            public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
                ServerHttpRequest request = exchange.getRequest();
                ServerHttpResponse response = exchange.getResponse();
                URI uri = request.getURI();
                
                return getMyObjInfo(uri.getPath())
                        .flatMap(api -> {
                            // something else
                            return chain.filter(exchange.mutate().request(request).build());
                        });

当我测试过滤器时,只打印开始过期,但它不起作用。

如果我添加 subscribeblock,它可以工作。显然这不是一个好主意,我不应该破坏反应器链。

请问我有正确的写法吗?

谢谢

在反应式中,您需要将所有异步操作组合成一个流程,使用各种反应式运算符(组装时间)链接发布者(Mono/Flux),然后订阅它(订阅时间) .你是对的,明确地调用 subscribe 是一种不好的做法,应该避免。 Spring WebFlux 在后台订阅提供的流程。

在您的代码中,您通过不链接 rMap.expire(2, TimeUnit.HOURS); 来破坏流程。你可以像这样重写代码

private Mono<MyObject> getMyObjectFromCache(String url) {
    RMapReactive<String, String> rMap = redissonReactiveClient.getMap(url);
    return rMap.readAllMap()
            .flatMap(m ->
                    rMap.remainTimeToLive()
                            .flatMap(ttl -> {
                                final long renewalThreshold = 60 * 60 * 1000;
                                if (ttl <= renewalThreshold) {
                                    System.out.println("start expiring");
                                    // it doesn't work without subscribe()
                                    return rMap.expire(2, TimeUnit.HOURS);
                                }
                                return Mono.just(false);
                            })
                            .then(JSONObject.parseObject(JSON.toJSONString(m), MyObject.class))
            );
}