为什么 RedisMessageListenerContainer 没有启动?

Why isn't RedisMessageListenerContainer starting up?

我正在尝试使用 Spring Data Redis 启动 Pub/Sub 设置,并且我能够加载发布者,但 RedisMessageListenerContainer 不会自动启动。我正在使用 Spring Data Redis 2.2.8.RELEASE 以及嵌入式 Redis 服务器(it.ozimov.embedded-redis 版本 0.7.2)。有谁知道为什么 RedisMessageListenerContainer 无法启动?

这是我的类。

RedisListenerAutoConfiguration

@Configuration
@ConditionalOnProperty(prefix = "myproj.redis", name = "mode", havingValue = "LISTENER", matchIfMissing = true)
@ComponentScan("com.jcworx.redis.listener")
@ConditionalOnBean(type = "com.jcworx.redis.listener.RedisMessageListener")
@AutoConfigureAfter(RedisAutoConfiguration.class)
@EnableConfigurationProperties(RedisConfigurationProperties.class)
public class RedisListenerAutoConfiguration {

    @Autowired
    private RedisConfigurationProperties redisConfigurationProperties;

    @Bean
    public MessageListenerAdapter messageListenerAdapter(RedisMessageListener<?> redisMessageListener){
        return new MessageListenerAdapter(redisMessageListener,"onRedisMessage");
    }

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter){
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(messageListenerAdapter, new ChannelTopic(redisConfigurationProperties.getQueueName()));
        return container;
    }
}

SimpleRedisMessageListener

@Component
public class SimpleRedisMessageListener extends AbstractRedisMessageListener<SimpleType>{

    private static final Logger LOG = LoggerFactory.getLogger(SimpleRedisMessageListener.class);

    private CountDownLatch countDownLatch;

    @Override
    public void processRedisMsg(RedisMessage<SimpleType> redisMsg) {
        LOG.info("Processing Message. trxId={}, payload={}",redisMsg.getTrxId(),redisMsg.getPayload());
        Assert.notNull(countDownLatch,"Count Down Latch cannot be null.");
        countDownLatch.countDown();
    }

    public CountDownLatch getCountDownLatch() {
        return countDownLatch;
    }

    public void setCountDownLatch(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

}

RedisServerConfiguration

@Configuration
@Profile("test")
public class RedisServerConfiguration {
    private RedisServer redisServer;

    @Autowired //redisProperties autowired from RedisAutoConfiguration
    public RedisServerConfiguration(RedisProperties redisProperties){
        redisServer = new RedisServer(redisProperties.getPort());
    }

    @PostConstruct
    public void postConstruct(){
        redisServer.start();
    }

    @PreDestroy
    public void preDestroy(){
        redisServer.stop();
    }
}

应用-test.properties

#application test resources
myproj.redis.queueName=test
spring.redis.host=localhost
spring.redis.port=6379
#set to true when you need to see the auto configuration rules
debug=true 

RedisPubSubABTTest

@SpringBootTest(classes = TestRedisApp.class)
@ActiveProfiles("test")
public class RedisPubSubABTTest {
    
    @Autowired
    private RedisMessagePublisher redisMessagePublisher;

    @Autowired
    private SimpleRedisMessageListener simpleRedisMessageListener;

    /**
     * Send a message to the embedded redis queue and await the listener to respond. If it
     * responds, then the countdown latch will count down to 0. Otherwise, it will time out
     * and fail to respond.
     * @throws InterruptedException
     */
    @Test
    public void messageSentAndReceived() throws InterruptedException{
        //ARRANGE
        SimpleType simpleType = new SimpleType();
        simpleType.setFirstName("John");
        simpleType.setLastName("Smith");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        simpleRedisMessageListener.setCountDownLatch(countDownLatch);
        RedisMessage<SimpleType> redisMsg = new RedisMessage.Builder<SimpleType>().TrxId(UUID.randomUUID().toString())
                                                                                              .payload(simpleType)
                                                                                              .build();
        //ACT
        redisMessagePublisher.publish(redisMsg);
        boolean responded = countDownLatch.await(5, TimeUnit.SECONDS);
        //ASSERT
        Assertions.assertTrue(responded);
    }
    
}

事实证明,MessageListenerAdapter 使用 RedisSerializer.string() 作为默认序列化程序。这意味着侦听器方法的参数列表中除 String 之外的任何 POJO 都将被忽略。为了解决这个问题,您需要调用 setSerializer 方法并将 RedisSerializer.java() 作为参数传入。这会让 MessageListenerAdapter 知道 POJO 是 java class 并且需要 serialized/deserialized。请注意,您决定传递的任何 pojo 都必须实现 java.io.Serializable。请参阅下面的示例,希望这对其他人有所帮助。

@Bean
public MessageListenerAdapter messageListenerAdapter(RedisMessageListener<?> redisMessageListener){
    MessageListenerAdapter msgAdapter = new MessageListenerAdapter(redisMessageListener,"onRedisMessage");
    msgAdapter.setSerializer(RedisSerializer.java());
    return msgAdapter;
}