为什么 Redis 不在 __keyevent@*__:expired 主题事件上调用我的 MessageListener?
Why doesn't Redis call my MessageListener on __keyevent@*__:expired topic events?
我最近将 Spring-Boot
升级到 2.1.4.RELEASE
,将 Spring-Cloud
升级到 Greenwich.SR1
。我的服务是 Java 11
运行。我对 Redis 的唯一依赖是 spring-boot-starter-data-redis
。虽然我通过设置 notify-keyspace-events Ex
在 Redis 上进行了配置,但我似乎无法从中接收到任何密钥过期事件。这是我第一次打算出于超时目的接收此类事件。可能出了什么问题?
请帮忙!
这是我的 Redis 配置:
@Configuration
public class RedisConfiguration {
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.verification-code-topic}")
private String verificationCodeTopic;
@Bean
public RedisConnectionFactory redisConnectionFactory(){
RedisStandaloneConfiguration configuration = new RedisStandaloneConfiguration();
configuration.setHostName(host);
configuration.setPort(Integer.valueOf(port));
return new LettuceConnectionFactory(configuration);
}
@Bean
@Primary
public RedisTemplate<FundRedisKey, ResetPasswordRequest> resetPasswordRedisTemplate(){
RedisTemplate<FundRedisKey, ResetPasswordRequest> redisTemplate = new RedisTemplate();
redisTemplate.setConnectionFactory(redisConnectionFactory());
redisTemplate.setValueSerializer(resetPasswordRequestSerializer());
redisTemplate.setKeySerializer(redisKeySerializer());
return redisTemplate;
}
@Bean
public RedisTemplate<FundRedisKey, VerificationMessage> verificationMessageRedisTemplate(){
RedisTemplate<FundRedisKey, VerificationMessage> redisTemplate = new RedisTemplate();
redisTemplate.setConnectionFactory(redisConnectionFactory());
redisTemplate.setValueSerializer(verificationMessageSerializer());
redisTemplate.setKeySerializer(redisKeySerializer());
return redisTemplate;
}
@Bean
@Primary
public MessageListener verificationCodeMessageListener(){
return new VerificationCodeSubscriber(verificationMessageSerializer(),
resetPasswordRedisTemplate());
}
@Bean
public MessageListener resetPasswordTimeoutListener(){
return new ResetPasswordTimeoutSubscriber(resetPasswordRequestSerializer());
}
@Bean
@Primary
public MessageListenerAdapter verificationCodeMessageListenerAdapter(){
return new MessageListenerAdapter(verificationCodeMessageListener());
}
@Bean
public MessageListenerAdapter resetPasswordTimeoutMessageListenerAdapter(){
return new MessageListenerAdapter(resetPasswordTimeoutListener());
}
@Bean
public ChannelTopic verificationCodeTopic(){
return new ChannelTopic(verificationCodeTopic);
}
@Bean
@DependsOn(value = "taskExecutor")
public RedisMessageListenerContainer fundMessageListenerContainer(
@Qualifier("taskExecutor")Executor executor){
RedisMessageListenerContainer messageListenerContainer = new RedisMessageListenerContainer();
messageListenerContainer.setConnectionFactory(redisConnectionFactory());
messageListenerContainer.addMessageListener(
verificationCodeMessageListenerAdapter(), verificationCodeTopic());
messageListenerContainer.addMessageListener(
resetPasswordTimeoutMessageListenerAdapter(), new PatternTopic("__keyevent@*__:expired"));
messageListenerContainer.setTaskExecutor(executor);
return messageListenerContainer;
}
@Bean
public MessagePublisher verificationCodeMessagePublisher(){
return new VerificationCodePublisher(
verificationMessageRedisTemplate(), verificationCodeTopic());
}
@Bean
public RedisSerializer verificationMessageSerializer(){
return new Jackson2JsonRedisSerializer(VerificationMessage.class);
}
@Bean
@Primary
public RedisSerializer resetPasswordRequestSerializer(){
return new Jackson2JsonRedisSerializer(ResetPasswordRequest.class);
}
@Bean
public RedisSerializer redisKeySerializer(){
return new Jackson2JsonRedisSerializer(FundRedisKey.class);
}
}
这是我的 ResetPasswordTimeoutSubscriber:
@Component
public class ResetPasswordTimeoutSubscriber implements MessageListener {
@Value("${spring.redis.key}")
private String key;
private final RedisSerializer messageSerializer;
public ResetPasswordTimeoutSubscriber(RedisSerializer messageSerializer){
this.messageSerializer = messageSerializer;
}
@Override
public void onMessage(Message message, byte[] bytes) {
ResetPasswordRequest resetPasswordRequest =
(ResetPasswordRequest)messageSerializer.deserialize(message.getBody());
//TODO Send operation timeout notification
}
}
这是我的 TaskExecutor 配置
@Configuration
@EnableAsync
@EnableScheduling
public class AsyncConfiguration implements AsyncConfigurer, SchedulingConfigurer {
private final Logger log = LoggerFactory.getLogger(AsyncConfiguration.class);
private final JHipsterProperties jHipsterProperties;
public AsyncConfiguration(JHipsterProperties jHipsterProperties) {
this.jHipsterProperties = jHipsterProperties;
}
@Override
@Bean(name = "taskExecutor")
public Executor getAsyncExecutor() {
log.debug("Creating Async Task Executor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(jHipsterProperties.getAsync().getCorePoolSize());
executor.setMaxPoolSize(jHipsterProperties.getAsync().getMaxPoolSize());
executor.setQueueCapacity(jHipsterProperties.getAsync().getQueueCapacity());
executor.setThreadNamePrefix("app-1-Executor-");
return new ExceptionHandlingAsyncTaskExecutor(executor);
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SimpleAsyncUncaughtExceptionHandler();
}
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setScheduler(scheduledTaskExecutor());
}
@Bean
public ScheduledExecutorService scheduledTaskExecutor() {
return Executors.newScheduledThreadPool(jHipsterProperties.getAsync().getCorePoolSize());
}
}
我没有在我的 redisKeySerializer
上添加 @Bean。我已将正确答案发布为问题正文。
我最近将 Spring-Boot
升级到 2.1.4.RELEASE
,将 Spring-Cloud
升级到 Greenwich.SR1
。我的服务是 Java 11
运行。我对 Redis 的唯一依赖是 spring-boot-starter-data-redis
。虽然我通过设置 notify-keyspace-events Ex
在 Redis 上进行了配置,但我似乎无法从中接收到任何密钥过期事件。这是我第一次打算出于超时目的接收此类事件。可能出了什么问题?
请帮忙!
这是我的 Redis 配置:
@Configuration
public class RedisConfiguration {
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.verification-code-topic}")
private String verificationCodeTopic;
@Bean
public RedisConnectionFactory redisConnectionFactory(){
RedisStandaloneConfiguration configuration = new RedisStandaloneConfiguration();
configuration.setHostName(host);
configuration.setPort(Integer.valueOf(port));
return new LettuceConnectionFactory(configuration);
}
@Bean
@Primary
public RedisTemplate<FundRedisKey, ResetPasswordRequest> resetPasswordRedisTemplate(){
RedisTemplate<FundRedisKey, ResetPasswordRequest> redisTemplate = new RedisTemplate();
redisTemplate.setConnectionFactory(redisConnectionFactory());
redisTemplate.setValueSerializer(resetPasswordRequestSerializer());
redisTemplate.setKeySerializer(redisKeySerializer());
return redisTemplate;
}
@Bean
public RedisTemplate<FundRedisKey, VerificationMessage> verificationMessageRedisTemplate(){
RedisTemplate<FundRedisKey, VerificationMessage> redisTemplate = new RedisTemplate();
redisTemplate.setConnectionFactory(redisConnectionFactory());
redisTemplate.setValueSerializer(verificationMessageSerializer());
redisTemplate.setKeySerializer(redisKeySerializer());
return redisTemplate;
}
@Bean
@Primary
public MessageListener verificationCodeMessageListener(){
return new VerificationCodeSubscriber(verificationMessageSerializer(),
resetPasswordRedisTemplate());
}
@Bean
public MessageListener resetPasswordTimeoutListener(){
return new ResetPasswordTimeoutSubscriber(resetPasswordRequestSerializer());
}
@Bean
@Primary
public MessageListenerAdapter verificationCodeMessageListenerAdapter(){
return new MessageListenerAdapter(verificationCodeMessageListener());
}
@Bean
public MessageListenerAdapter resetPasswordTimeoutMessageListenerAdapter(){
return new MessageListenerAdapter(resetPasswordTimeoutListener());
}
@Bean
public ChannelTopic verificationCodeTopic(){
return new ChannelTopic(verificationCodeTopic);
}
@Bean
@DependsOn(value = "taskExecutor")
public RedisMessageListenerContainer fundMessageListenerContainer(
@Qualifier("taskExecutor")Executor executor){
RedisMessageListenerContainer messageListenerContainer = new RedisMessageListenerContainer();
messageListenerContainer.setConnectionFactory(redisConnectionFactory());
messageListenerContainer.addMessageListener(
verificationCodeMessageListenerAdapter(), verificationCodeTopic());
messageListenerContainer.addMessageListener(
resetPasswordTimeoutMessageListenerAdapter(), new PatternTopic("__keyevent@*__:expired"));
messageListenerContainer.setTaskExecutor(executor);
return messageListenerContainer;
}
@Bean
public MessagePublisher verificationCodeMessagePublisher(){
return new VerificationCodePublisher(
verificationMessageRedisTemplate(), verificationCodeTopic());
}
@Bean
public RedisSerializer verificationMessageSerializer(){
return new Jackson2JsonRedisSerializer(VerificationMessage.class);
}
@Bean
@Primary
public RedisSerializer resetPasswordRequestSerializer(){
return new Jackson2JsonRedisSerializer(ResetPasswordRequest.class);
}
@Bean
public RedisSerializer redisKeySerializer(){
return new Jackson2JsonRedisSerializer(FundRedisKey.class);
}
}
这是我的 ResetPasswordTimeoutSubscriber:
@Component
public class ResetPasswordTimeoutSubscriber implements MessageListener {
@Value("${spring.redis.key}")
private String key;
private final RedisSerializer messageSerializer;
public ResetPasswordTimeoutSubscriber(RedisSerializer messageSerializer){
this.messageSerializer = messageSerializer;
}
@Override
public void onMessage(Message message, byte[] bytes) {
ResetPasswordRequest resetPasswordRequest =
(ResetPasswordRequest)messageSerializer.deserialize(message.getBody());
//TODO Send operation timeout notification
}
}
这是我的 TaskExecutor 配置
@Configuration
@EnableAsync
@EnableScheduling
public class AsyncConfiguration implements AsyncConfigurer, SchedulingConfigurer {
private final Logger log = LoggerFactory.getLogger(AsyncConfiguration.class);
private final JHipsterProperties jHipsterProperties;
public AsyncConfiguration(JHipsterProperties jHipsterProperties) {
this.jHipsterProperties = jHipsterProperties;
}
@Override
@Bean(name = "taskExecutor")
public Executor getAsyncExecutor() {
log.debug("Creating Async Task Executor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(jHipsterProperties.getAsync().getCorePoolSize());
executor.setMaxPoolSize(jHipsterProperties.getAsync().getMaxPoolSize());
executor.setQueueCapacity(jHipsterProperties.getAsync().getQueueCapacity());
executor.setThreadNamePrefix("app-1-Executor-");
return new ExceptionHandlingAsyncTaskExecutor(executor);
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SimpleAsyncUncaughtExceptionHandler();
}
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setScheduler(scheduledTaskExecutor());
}
@Bean
public ScheduledExecutorService scheduledTaskExecutor() {
return Executors.newScheduledThreadPool(jHipsterProperties.getAsync().getCorePoolSize());
}
}
我没有在我的 redisKeySerializer
上添加 @Bean。我已将正确答案发布为问题正文。