在 Spring Web 流量中执行没有人订阅的 Mono 流
Executing Mono streams that no one subscribes to in Spring Web flux
我有一个 spring Webflux 应用程序。此应用程序有两个重要部分:
- 作业按固定时间间隔安排 运行。
- 作业从数据库中获取数据并将数据存储在 Redis 中。
void run() {
redisAdapter.getTtl()
.doOnError(RefreshExternalCache::logError)
.switchIfEmpty(Mono.defer(() -> {
log.debug(">> RefreshExternalCache > refreshExternalCacheIfNeeded => Remaining TTL could not be retrieved. Cache does not exist. " +
"Trying to create the cache.");
return Mono.just(Duration.ofSeconds(0));
}))
.subscribe(remainingTtl -> {
log.debug(">> RefreshExternalCache > refreshExternalCacheIfNeeded => original ttl for the cache: {} | ttl for cache in seconds = {} | ttl for cache in minutes = {}",
remainingTtl, remainingTtl.getSeconds(), remainingTtl.toMinutes());
if (isExternalCacheRefreshNeeded(remainingTtl, offerServiceProperties.getExternalCacheExpiration(), offerServiceProperties.getExternalCacheRefreshPeriod())) {
log.debug(">> RefreshExternalCache > refreshExternalCacheIfNeeded => external cache is up-to-date, skipping refresh");
} else {
log.debug(">> RefreshExternalCache > refreshExternalCacheIfNeeded => external cache is outdated, updating the external cache");
offerService.refreshExternalCache();
}
});
}
这基本上调用了另一个名为 refreshExternalCache()
的方法,实现如下:
public void refreshExternalCache() {
fetchOffersFromSource()
.doOnNext(offerData -> {
log.debug(LOG_REFRESH_CACHE + "Updating local offer cache with data from source");
localCache.put(OFFER_DATA_KEY, offerData);
storeOffersInExternalCache(offerData, offerServiceProperties.getExternalCacheExpiration());
})
.doOnSuccess(offerData -> meterRegistry.counter(METRIC_EXTERNAL_CACHE_REFRESH_COUNTER, TAG_OUTCOME, SUCCESS).increment())
.doOnError(sourceThrowable -> {
log.debug(LOG_REFRESH_CACHE + "Error while refreshing external cache {}", sourceThrowable.getMessage());
meterRegistry.counter(METRIC_EXTERNAL_CACHE_REFRESH_COUNTER, TAG_OUTCOME, FAILURE).increment();
}).subscribe();
}
此外,在上述方法中,您可以看到对 storeOffersInExternalCache
的调用
public void storeOffersInExternalCache(OfferData offerData, Duration ttl) {
log.info(LOG_STORING_OFFER_DATA + "Storing the offer data in external cache...");
redisAdapter.storeOffers(offerData, ttl);
}
public void storeOffers(OfferData offerData, Duration ttl) {
Mono.fromRunnable(() -> redisClient.storeSerializedOffers(serializeFromDomain(offerData), ttl)
.doOnNext(status -> {
if (Boolean.TRUE.equals(status)) {
log.info(LOG_STORE_OFFERS + "Data stored in redis.");
meterRegistry.counter(METRIC_REDIS_STORE_OFFERS, TAG_OUTCOME, SUCCESS).increment();
} else {
log.error(LOG_STORE_OFFERS + "Unable to store data in redis.");
meterRegistry.counter(METRIC_REDIS_STORE_OFFERS, TAG_OUTCOME, FAILURE).increment();
}
}).retryWhen(Retry.backoff(redisRetryProperties.getMaxAttempts(), redisRetryProperties.getWaitDuration()).jitter(redisRetryProperties.getBackoffJitter()))
.doOnError(throwable -> {
meterRegistry.counter(METRIC_REDIS_STORE_OFFERS, TAG_OUTCOME, FAILURE).increment();
log.error(LOG_STORE_OFFERS + "Unable to store data in redis. Error: [{}]", throwable.getMessage());
})).subscribeOn(Schedulers.boundedElastic());
}
Redis 客户端
@Slf4j
@Component
public class RedisClient {
private final ReactiveRedisTemplate<String, String> reactiveRedisTemplate;
private final ReactiveValueOperations<String, String> reactiveValueOps;
public RedisClient(@Qualifier("reactiveRedisTemplate") ReactiveRedisTemplate<String, String> reactiveRedisTemplate) {
this.reactiveRedisTemplate = reactiveRedisTemplate;
this.reactiveValueOps = reactiveRedisTemplate.opsForValue();
}
Mono<Optional<String>> fetchSerializedOffers() {
return reactiveValueOps.get(OFFER_DATA_KEY).map(Optional::ofNullable);
}
Mono<Boolean> storeSerializedOffers(String serializedOffers, Duration ttl) {
return reactiveValueOps.set(OFFER_DATA_KEY, serializedOffers, ttl);
}
Mono<Duration> getTtl() {
return reactiveRedisTemplate.getExpire(OFFER_DATA_KEY);
}
}
现在我担心的是:
- 如果我不在这些 Mono 流上调用
subscribe
方法,这些方法甚至都不会执行。这是公平的,因为在有人订阅它们之前它们不会执行。
- 据我理解,
subscribe
是一个阻塞调用。这违背了反应式编程的全部目的。不是吗?
- 我寻找了几种方法来完成这项工作,上面显示了其中一种。我尝试调用
Mono.fromRunnable
中的一种方法,但这也不是一个很好的方法。 (在 Whosebug 的另一个线程上阅读它)。
那么,我上面的做法是不是不对呢?我们如何执行没有人订阅的 Mono 流?
回答您的第 2 个问题(这似乎是您问题中唯一真正的疑问)。并不真地。 block()
(https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#block--) is the one that subscribes to a Mono
or Flux
and waits indefinitely until a next signal is received. On the other hand subscribe()
(https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#subscribe--) 订阅 Mono
或 Flux
但它不会阻塞,而是在发出元素时做出反应。
我有一个 spring Webflux 应用程序。此应用程序有两个重要部分:
- 作业按固定时间间隔安排 运行。
- 作业从数据库中获取数据并将数据存储在 Redis 中。
void run() {
redisAdapter.getTtl()
.doOnError(RefreshExternalCache::logError)
.switchIfEmpty(Mono.defer(() -> {
log.debug(">> RefreshExternalCache > refreshExternalCacheIfNeeded => Remaining TTL could not be retrieved. Cache does not exist. " +
"Trying to create the cache.");
return Mono.just(Duration.ofSeconds(0));
}))
.subscribe(remainingTtl -> {
log.debug(">> RefreshExternalCache > refreshExternalCacheIfNeeded => original ttl for the cache: {} | ttl for cache in seconds = {} | ttl for cache in minutes = {}",
remainingTtl, remainingTtl.getSeconds(), remainingTtl.toMinutes());
if (isExternalCacheRefreshNeeded(remainingTtl, offerServiceProperties.getExternalCacheExpiration(), offerServiceProperties.getExternalCacheRefreshPeriod())) {
log.debug(">> RefreshExternalCache > refreshExternalCacheIfNeeded => external cache is up-to-date, skipping refresh");
} else {
log.debug(">> RefreshExternalCache > refreshExternalCacheIfNeeded => external cache is outdated, updating the external cache");
offerService.refreshExternalCache();
}
});
}
这基本上调用了另一个名为 refreshExternalCache()
的方法,实现如下:
public void refreshExternalCache() {
fetchOffersFromSource()
.doOnNext(offerData -> {
log.debug(LOG_REFRESH_CACHE + "Updating local offer cache with data from source");
localCache.put(OFFER_DATA_KEY, offerData);
storeOffersInExternalCache(offerData, offerServiceProperties.getExternalCacheExpiration());
})
.doOnSuccess(offerData -> meterRegistry.counter(METRIC_EXTERNAL_CACHE_REFRESH_COUNTER, TAG_OUTCOME, SUCCESS).increment())
.doOnError(sourceThrowable -> {
log.debug(LOG_REFRESH_CACHE + "Error while refreshing external cache {}", sourceThrowable.getMessage());
meterRegistry.counter(METRIC_EXTERNAL_CACHE_REFRESH_COUNTER, TAG_OUTCOME, FAILURE).increment();
}).subscribe();
}
此外,在上述方法中,您可以看到对 storeOffersInExternalCache
public void storeOffersInExternalCache(OfferData offerData, Duration ttl) {
log.info(LOG_STORING_OFFER_DATA + "Storing the offer data in external cache...");
redisAdapter.storeOffers(offerData, ttl);
}
public void storeOffers(OfferData offerData, Duration ttl) {
Mono.fromRunnable(() -> redisClient.storeSerializedOffers(serializeFromDomain(offerData), ttl)
.doOnNext(status -> {
if (Boolean.TRUE.equals(status)) {
log.info(LOG_STORE_OFFERS + "Data stored in redis.");
meterRegistry.counter(METRIC_REDIS_STORE_OFFERS, TAG_OUTCOME, SUCCESS).increment();
} else {
log.error(LOG_STORE_OFFERS + "Unable to store data in redis.");
meterRegistry.counter(METRIC_REDIS_STORE_OFFERS, TAG_OUTCOME, FAILURE).increment();
}
}).retryWhen(Retry.backoff(redisRetryProperties.getMaxAttempts(), redisRetryProperties.getWaitDuration()).jitter(redisRetryProperties.getBackoffJitter()))
.doOnError(throwable -> {
meterRegistry.counter(METRIC_REDIS_STORE_OFFERS, TAG_OUTCOME, FAILURE).increment();
log.error(LOG_STORE_OFFERS + "Unable to store data in redis. Error: [{}]", throwable.getMessage());
})).subscribeOn(Schedulers.boundedElastic());
}
Redis 客户端
@Slf4j
@Component
public class RedisClient {
private final ReactiveRedisTemplate<String, String> reactiveRedisTemplate;
private final ReactiveValueOperations<String, String> reactiveValueOps;
public RedisClient(@Qualifier("reactiveRedisTemplate") ReactiveRedisTemplate<String, String> reactiveRedisTemplate) {
this.reactiveRedisTemplate = reactiveRedisTemplate;
this.reactiveValueOps = reactiveRedisTemplate.opsForValue();
}
Mono<Optional<String>> fetchSerializedOffers() {
return reactiveValueOps.get(OFFER_DATA_KEY).map(Optional::ofNullable);
}
Mono<Boolean> storeSerializedOffers(String serializedOffers, Duration ttl) {
return reactiveValueOps.set(OFFER_DATA_KEY, serializedOffers, ttl);
}
Mono<Duration> getTtl() {
return reactiveRedisTemplate.getExpire(OFFER_DATA_KEY);
}
}
现在我担心的是:
- 如果我不在这些 Mono 流上调用
subscribe
方法,这些方法甚至都不会执行。这是公平的,因为在有人订阅它们之前它们不会执行。 - 据我理解,
subscribe
是一个阻塞调用。这违背了反应式编程的全部目的。不是吗? - 我寻找了几种方法来完成这项工作,上面显示了其中一种。我尝试调用
Mono.fromRunnable
中的一种方法,但这也不是一个很好的方法。 (在 Whosebug 的另一个线程上阅读它)。
那么,我上面的做法是不是不对呢?我们如何执行没有人订阅的 Mono 流?
回答您的第 2 个问题(这似乎是您问题中唯一真正的疑问)。并不真地。 block()
(https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#block--) is the one that subscribes to a Mono
or Flux
and waits indefinitely until a next signal is received. On the other hand subscribe()
(https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#subscribe--) 订阅 Mono
或 Flux
但它不会阻塞,而是在发出元素时做出反应。