为什么 RedisMessageListenerContainer 没有启动?
Why isn't RedisMessageListenerContainer starting up?
我正在尝试使用 Spring Data Redis 启动 Pub/Sub 设置,并且我能够加载发布者,但 RedisMessageListenerContainer 不会自动启动。我正在使用 Spring Data Redis 2.2.8.RELEASE 以及嵌入式 Redis 服务器(it.ozimov.embedded-redis 版本 0.7.2)。有谁知道为什么 RedisMessageListenerContainer 无法启动?
这是我的类。
RedisListenerAutoConfiguration
@Configuration
@ConditionalOnProperty(prefix = "myproj.redis", name = "mode", havingValue = "LISTENER", matchIfMissing = true)
@ComponentScan("com.jcworx.redis.listener")
@ConditionalOnBean(type = "com.jcworx.redis.listener.RedisMessageListener")
@AutoConfigureAfter(RedisAutoConfiguration.class)
@EnableConfigurationProperties(RedisConfigurationProperties.class)
public class RedisListenerAutoConfiguration {
@Autowired
private RedisConfigurationProperties redisConfigurationProperties;
@Bean
public MessageListenerAdapter messageListenerAdapter(RedisMessageListener<?> redisMessageListener){
return new MessageListenerAdapter(redisMessageListener,"onRedisMessage");
}
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter){
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(messageListenerAdapter, new ChannelTopic(redisConfigurationProperties.getQueueName()));
return container;
}
}
SimpleRedisMessageListener
@Component
public class SimpleRedisMessageListener extends AbstractRedisMessageListener<SimpleType>{
private static final Logger LOG = LoggerFactory.getLogger(SimpleRedisMessageListener.class);
private CountDownLatch countDownLatch;
@Override
public void processRedisMsg(RedisMessage<SimpleType> redisMsg) {
LOG.info("Processing Message. trxId={}, payload={}",redisMsg.getTrxId(),redisMsg.getPayload());
Assert.notNull(countDownLatch,"Count Down Latch cannot be null.");
countDownLatch.countDown();
}
public CountDownLatch getCountDownLatch() {
return countDownLatch;
}
public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
}
RedisServerConfiguration
@Configuration
@Profile("test")
public class RedisServerConfiguration {
private RedisServer redisServer;
@Autowired //redisProperties autowired from RedisAutoConfiguration
public RedisServerConfiguration(RedisProperties redisProperties){
redisServer = new RedisServer(redisProperties.getPort());
}
@PostConstruct
public void postConstruct(){
redisServer.start();
}
@PreDestroy
public void preDestroy(){
redisServer.stop();
}
}
应用-test.properties
#application test resources
myproj.redis.queueName=test
spring.redis.host=localhost
spring.redis.port=6379
#set to true when you need to see the auto configuration rules
debug=true
RedisPubSubABTTest
@SpringBootTest(classes = TestRedisApp.class)
@ActiveProfiles("test")
public class RedisPubSubABTTest {
@Autowired
private RedisMessagePublisher redisMessagePublisher;
@Autowired
private SimpleRedisMessageListener simpleRedisMessageListener;
/**
* Send a message to the embedded redis queue and await the listener to respond. If it
* responds, then the countdown latch will count down to 0. Otherwise, it will time out
* and fail to respond.
* @throws InterruptedException
*/
@Test
public void messageSentAndReceived() throws InterruptedException{
//ARRANGE
SimpleType simpleType = new SimpleType();
simpleType.setFirstName("John");
simpleType.setLastName("Smith");
CountDownLatch countDownLatch = new CountDownLatch(1);
simpleRedisMessageListener.setCountDownLatch(countDownLatch);
RedisMessage<SimpleType> redisMsg = new RedisMessage.Builder<SimpleType>().TrxId(UUID.randomUUID().toString())
.payload(simpleType)
.build();
//ACT
redisMessagePublisher.publish(redisMsg);
boolean responded = countDownLatch.await(5, TimeUnit.SECONDS);
//ASSERT
Assertions.assertTrue(responded);
}
}
事实证明,MessageListenerAdapter
使用 RedisSerializer.string()
作为默认序列化程序。这意味着侦听器方法的参数列表中除 String 之外的任何 POJO 都将被忽略。为了解决这个问题,您需要调用 setSerializer
方法并将 RedisSerializer.java()
作为参数传入。这会让 MessageListenerAdapter
知道 POJO 是 java class 并且需要 serialized/deserialized。请注意,您决定传递的任何 pojo 都必须实现 java.io.Serializable
。请参阅下面的示例,希望这对其他人有所帮助。
@Bean
public MessageListenerAdapter messageListenerAdapter(RedisMessageListener<?> redisMessageListener){
MessageListenerAdapter msgAdapter = new MessageListenerAdapter(redisMessageListener,"onRedisMessage");
msgAdapter.setSerializer(RedisSerializer.java());
return msgAdapter;
}
我正在尝试使用 Spring Data Redis 启动 Pub/Sub 设置,并且我能够加载发布者,但 RedisMessageListenerContainer 不会自动启动。我正在使用 Spring Data Redis 2.2.8.RELEASE 以及嵌入式 Redis 服务器(it.ozimov.embedded-redis 版本 0.7.2)。有谁知道为什么 RedisMessageListenerContainer 无法启动?
这是我的类。
RedisListenerAutoConfiguration
@Configuration
@ConditionalOnProperty(prefix = "myproj.redis", name = "mode", havingValue = "LISTENER", matchIfMissing = true)
@ComponentScan("com.jcworx.redis.listener")
@ConditionalOnBean(type = "com.jcworx.redis.listener.RedisMessageListener")
@AutoConfigureAfter(RedisAutoConfiguration.class)
@EnableConfigurationProperties(RedisConfigurationProperties.class)
public class RedisListenerAutoConfiguration {
@Autowired
private RedisConfigurationProperties redisConfigurationProperties;
@Bean
public MessageListenerAdapter messageListenerAdapter(RedisMessageListener<?> redisMessageListener){
return new MessageListenerAdapter(redisMessageListener,"onRedisMessage");
}
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter){
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(messageListenerAdapter, new ChannelTopic(redisConfigurationProperties.getQueueName()));
return container;
}
}
SimpleRedisMessageListener
@Component
public class SimpleRedisMessageListener extends AbstractRedisMessageListener<SimpleType>{
private static final Logger LOG = LoggerFactory.getLogger(SimpleRedisMessageListener.class);
private CountDownLatch countDownLatch;
@Override
public void processRedisMsg(RedisMessage<SimpleType> redisMsg) {
LOG.info("Processing Message. trxId={}, payload={}",redisMsg.getTrxId(),redisMsg.getPayload());
Assert.notNull(countDownLatch,"Count Down Latch cannot be null.");
countDownLatch.countDown();
}
public CountDownLatch getCountDownLatch() {
return countDownLatch;
}
public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
}
RedisServerConfiguration
@Configuration
@Profile("test")
public class RedisServerConfiguration {
private RedisServer redisServer;
@Autowired //redisProperties autowired from RedisAutoConfiguration
public RedisServerConfiguration(RedisProperties redisProperties){
redisServer = new RedisServer(redisProperties.getPort());
}
@PostConstruct
public void postConstruct(){
redisServer.start();
}
@PreDestroy
public void preDestroy(){
redisServer.stop();
}
}
应用-test.properties
#application test resources
myproj.redis.queueName=test
spring.redis.host=localhost
spring.redis.port=6379
#set to true when you need to see the auto configuration rules
debug=true
RedisPubSubABTTest
@SpringBootTest(classes = TestRedisApp.class)
@ActiveProfiles("test")
public class RedisPubSubABTTest {
@Autowired
private RedisMessagePublisher redisMessagePublisher;
@Autowired
private SimpleRedisMessageListener simpleRedisMessageListener;
/**
* Send a message to the embedded redis queue and await the listener to respond. If it
* responds, then the countdown latch will count down to 0. Otherwise, it will time out
* and fail to respond.
* @throws InterruptedException
*/
@Test
public void messageSentAndReceived() throws InterruptedException{
//ARRANGE
SimpleType simpleType = new SimpleType();
simpleType.setFirstName("John");
simpleType.setLastName("Smith");
CountDownLatch countDownLatch = new CountDownLatch(1);
simpleRedisMessageListener.setCountDownLatch(countDownLatch);
RedisMessage<SimpleType> redisMsg = new RedisMessage.Builder<SimpleType>().TrxId(UUID.randomUUID().toString())
.payload(simpleType)
.build();
//ACT
redisMessagePublisher.publish(redisMsg);
boolean responded = countDownLatch.await(5, TimeUnit.SECONDS);
//ASSERT
Assertions.assertTrue(responded);
}
}
事实证明,MessageListenerAdapter
使用 RedisSerializer.string()
作为默认序列化程序。这意味着侦听器方法的参数列表中除 String 之外的任何 POJO 都将被忽略。为了解决这个问题,您需要调用 setSerializer
方法并将 RedisSerializer.java()
作为参数传入。这会让 MessageListenerAdapter
知道 POJO 是 java class 并且需要 serialized/deserialized。请注意,您决定传递的任何 pojo 都必须实现 java.io.Serializable
。请参阅下面的示例,希望这对其他人有所帮助。
@Bean
public MessageListenerAdapter messageListenerAdapter(RedisMessageListener<?> redisMessageListener){
MessageListenerAdapter msgAdapter = new MessageListenerAdapter(redisMessageListener,"onRedisMessage");
msgAdapter.setSerializer(RedisSerializer.java());
return msgAdapter;
}