如何将元素添加到 Redis 哈希并使用无阻塞的 ReactiveRedisTemplate 设置过期时间?
How do you add elements to a Redis Hash and set the expiration using ReactiveRedisTemplate without block?
如果它不是反应性的,我会做类似的事情。
/**
* Given a refresh token and the user name, create a "payload" to represent
* secret data and store it into Redis as a hash and set it to expire in 30 seconds.
*/
Map<String, String> provideAuthenticatedData(String refreshTokenMono, String username) {
var ops = redisTemplate.opsForHash();
var payload = Map.of(
"username", username,
"secret", UUID.randomUUID().toString()
);
var puts = payload.entrySet()
.stream()
.map(e->ops.putIfAbsent(refreshToken, e.key(), e.value())
.filter(success -> !success) // finds those that have failed
.toList();
if (!puts.isEmpty()) {
throw new IllegalStateException("some elements failed to save");
}
var expireCheck = redisTemplate.expireAt(refreshToken, Instant.now().plusSeconds(30));
if (!expireCheck) {
throw new IllegalStateException("unable to expire");
}
return payload;
}
尝试用 Reactive 来做它看起来有点混乱,我在一点后卡住了
/**
* Given a refresh token mono and the user name, create a "payload" to represent
* secret data and store it into Redis as a hash and set it to expire in 30 seconds.
*/
Mono<Map<String, String>> provideAuthenticatedData(Mono<String> refreshTokenMono, String username) {
var ops = reactiveRedisTemplate.opsForHash();
var payload = Map.of(
"username", username,
"secret", UUID.randomUUID().toString()
);
return refreshTokenMono
.flatMapIterable(
refreshToken -> payload.entrySet()
.stream()
.map(
e -> ops.putIfAbsent(refreshToken, e.getKey(), e.getValue())
)
.toList() // can't find an operator that would take a stream
)
// at this point I have a Flux<Mono<Boolean>>
// somehow I have to find out if any of them are false then return a Mono.error()
// then once all of it is done, set the key to expire
// finally return the payload I originally created
}
我做的另一种方法是这个,但它不做任何错误处理。
Mono<Map<String, String>> provideAuthenticatedDataMono(
Mono<String> refreshTokenMono, String username) {
var ops = reactiveRedisTemplate.opsForHash();
var payload = Map.of(
"username", username,
"secret", UUID.randomUUID().toString()
);
return refreshTokenMono
.doOnNext(
refreshToken ->
payload
.entrySet()
.stream()
.map(
e -> ops.putIfAbsent(
refreshToken,
e.getKey(),
e.getValue())
)
.forEach(Mono::subscribe)
)
.doOnNext(
refreshToken ->
redisTemplate
.expireAt(
refreshToken,
Instant.now().plusSeconds(30)
)
.subscribe()
)
.flatMap((x) -> just(payload));
}
}
reactive 的主要思想是在流中工作,所以你应该避免在任何地方订阅你应该 return Flux,Mono Stream。
第一个示例不起作用,因为您没有订阅 redis 给您的单声道。
因为您使用的是响应式,所以为什么要将它与 java 流混合。
一个解决方案就像
public Mono< YOUR_OBJECT > save(Object YOUR_OBJECT) {
return template.opsForValue().set(YOUR_OBJECT.key, YOUR_OBJECT)
.filter(aBoolean -> aBoolean)
.map(aBoolean -> YOUR_OBJECT)
.switchIfEmpty(Mono.error(new RuntimeException("Could not save data to redis")));
}
你应该继续直播直到控制器或你订阅时结束
如果它不是反应性的,我会做类似的事情。
/**
* Given a refresh token and the user name, create a "payload" to represent
* secret data and store it into Redis as a hash and set it to expire in 30 seconds.
*/
Map<String, String> provideAuthenticatedData(String refreshTokenMono, String username) {
var ops = redisTemplate.opsForHash();
var payload = Map.of(
"username", username,
"secret", UUID.randomUUID().toString()
);
var puts = payload.entrySet()
.stream()
.map(e->ops.putIfAbsent(refreshToken, e.key(), e.value())
.filter(success -> !success) // finds those that have failed
.toList();
if (!puts.isEmpty()) {
throw new IllegalStateException("some elements failed to save");
}
var expireCheck = redisTemplate.expireAt(refreshToken, Instant.now().plusSeconds(30));
if (!expireCheck) {
throw new IllegalStateException("unable to expire");
}
return payload;
}
尝试用 Reactive 来做它看起来有点混乱,我在一点后卡住了
/**
* Given a refresh token mono and the user name, create a "payload" to represent
* secret data and store it into Redis as a hash and set it to expire in 30 seconds.
*/
Mono<Map<String, String>> provideAuthenticatedData(Mono<String> refreshTokenMono, String username) {
var ops = reactiveRedisTemplate.opsForHash();
var payload = Map.of(
"username", username,
"secret", UUID.randomUUID().toString()
);
return refreshTokenMono
.flatMapIterable(
refreshToken -> payload.entrySet()
.stream()
.map(
e -> ops.putIfAbsent(refreshToken, e.getKey(), e.getValue())
)
.toList() // can't find an operator that would take a stream
)
// at this point I have a Flux<Mono<Boolean>>
// somehow I have to find out if any of them are false then return a Mono.error()
// then once all of it is done, set the key to expire
// finally return the payload I originally created
}
我做的另一种方法是这个,但它不做任何错误处理。
Mono<Map<String, String>> provideAuthenticatedDataMono(
Mono<String> refreshTokenMono, String username) {
var ops = reactiveRedisTemplate.opsForHash();
var payload = Map.of(
"username", username,
"secret", UUID.randomUUID().toString()
);
return refreshTokenMono
.doOnNext(
refreshToken ->
payload
.entrySet()
.stream()
.map(
e -> ops.putIfAbsent(
refreshToken,
e.getKey(),
e.getValue())
)
.forEach(Mono::subscribe)
)
.doOnNext(
refreshToken ->
redisTemplate
.expireAt(
refreshToken,
Instant.now().plusSeconds(30)
)
.subscribe()
)
.flatMap((x) -> just(payload));
}
}
reactive 的主要思想是在流中工作,所以你应该避免在任何地方订阅你应该 return Flux,Mono Stream。
第一个示例不起作用,因为您没有订阅 redis 给您的单声道。
因为您使用的是响应式,所以为什么要将它与 java 流混合。
一个解决方案就像
public Mono< YOUR_OBJECT > save(Object YOUR_OBJECT) {
return template.opsForValue().set(YOUR_OBJECT.key, YOUR_OBJECT)
.filter(aBoolean -> aBoolean)
.map(aBoolean -> YOUR_OBJECT)
.switchIfEmpty(Mono.error(new RuntimeException("Could not save data to redis")));
}
你应该继续直播直到控制器或你订阅时结束