如何将元素添加到 Redis 哈希并使用无阻塞的 ReactiveRedisTemplate 设置过期时间?

How do you add elements to a Redis Hash and set the expiration using ReactiveRedisTemplate without block?

如果它不是反应性的,我会做类似的事情。

/**
 * Given a refresh token and the user name, create a "payload" to represent
 * secret data and store it into Redis as a hash and set it to expire in 30 seconds.
 */
Map<String, String> provideAuthenticatedData(String refreshTokenMono, String username) {
  var ops = redisTemplate.opsForHash();
  var payload = Map.of(
    "username", username,
    "secret", UUID.randomUUID().toString()
  );
  var puts = payload.entrySet()
    .stream()
    .map(e->ops.putIfAbsent(refreshToken, e.key(), e.value())
    .filter(success -> !success) // finds those that have failed
    .toList();
  if (!puts.isEmpty()) {
    throw new IllegalStateException("some elements failed to save");
  }
  var expireCheck = redisTemplate.expireAt(refreshToken, Instant.now().plusSeconds(30));
  if (!expireCheck) {
    throw new IllegalStateException("unable to expire");
  }
  return payload;
}

尝试用 Reactive 来做它看起来有点混乱,我在一点后卡住了

/**
 * Given a refresh token mono and the user name, create a "payload" to represent
 * secret data and store it into Redis as a hash and set it to expire in 30 seconds.
 */
Mono<Map<String, String>> provideAuthenticatedData(Mono<String> refreshTokenMono, String username) {
  var ops = reactiveRedisTemplate.opsForHash();
  var payload = Map.of(
    "username", username,
    "secret", UUID.randomUUID().toString()
  );
  return refreshTokenMono
    .flatMapIterable(
      refreshToken -> payload.entrySet()
        .stream()
        .map(
          e -> ops.putIfAbsent(refreshToken, e.getKey(), e.getValue())
        )
        .toList() // can't find an operator that would take a stream
    )
    // at this point I have a Flux<Mono<Boolean>>
    // somehow I have to find out if any of them are false then return a Mono.error()
    // then once all of it is done, set the key to expire
    // finally return the payload I originally created
}

我做的另一种方法是这个,但它不做任何错误处理。

Mono<Map<String, String>> provideAuthenticatedDataMono(
      Mono<String> refreshTokenMono, String username) {

  var ops = reactiveRedisTemplate.opsForHash();
  var payload = Map.of(
    "username", username,
    "secret", UUID.randomUUID().toString()
  );

  return refreshTokenMono
    .doOnNext(
        refreshToken ->
          payload
            .entrySet()
            .stream()
            .map(
              e ->  ops.putIfAbsent(
                refreshToken,
                e.getKey(),
                e.getValue())
             )
            .forEach(Mono::subscribe)
    )
    .doOnNext(
       refreshToken ->
         redisTemplate
           .expireAt(
             refreshToken,
             Instant.now().plusSeconds(30)
           )
           .subscribe()
    )
    .flatMap((x) -> just(payload));
  }
}

reactive 的主要思想是在流中工作,所以你应该避免在任何地方订阅你应该 return Flux,Mono Stream。

第一个示例不起作用,因为您没有订阅 redis 给您的单声道。

因为您使用的是响应式,所以为什么要将它与 java 流混合。

一个解决方案就像

  public Mono< YOUR_OBJECT > save(Object YOUR_OBJECT) {
return template.opsForValue().set(YOUR_OBJECT.key, YOUR_OBJECT)
    .filter(aBoolean -> aBoolean)
    .map(aBoolean -> YOUR_OBJECT)
    .switchIfEmpty(Mono.error(new RuntimeException("Could not save data to redis")));

}

你应该继续直播直到控制器或你订阅时结束