redis 订阅者不能与 redis 发布者一起工作
redis subscriber can't work with redis publisher
我现在在用java设计redispub/sub系统,遇到了问题。我会告诉你详情:
这里的发布者:
public class RedisMessagePublisher implements MessagePublisher {
public RedisMessagePublisher(StringRedisTemplate redisTemplate,ChannelTopic topic)
{
this.redisTemplate = redisTemplate;
this.topic = topic;
}
private StringRedisTemplate redisTemplate;
private ChannelTopic topic;
@Override
public void publish(String message) {
redisTemplate.convertAndSend(topic.getTopic(), message);
}
}
发布者正确,可以正常工作。
然后让我们转到订阅者 class:
public class RedisMessageSubscriber implements MessageListener {
//action inspect here
private Action2<Message, byte[]> action;
public void setAction(Action2<Message, byte[]> action) {
logger.info("action set");
this.action = action;
}
private static Logger logger = LogManager.getLogger(RedisMessageSubscriber.class);
@Override
public void onMessage(Message message, byte[] bytes) {
logger.info("===> redis subscribe message in <===");
if (action != null)
action.call(message, bytes);
else
logger.info("===> action is null <===");
}
}
在订阅者 class 中,我使用 RxJava 注入 Action 以便我可以更轻松地使用它。
但是问题来了,我发布publisher的消息后,我发现消息可以被运行转发给onMessage方法,日志打印不是我所期望的:
===> redis subscribe message in <===
===> action is null <===
我期望的是,当我发布一条新消息时,订阅者会收到它并且 运行 我创建的操作。
下面我用来触发发布者和订阅者的服务:
@RestController("redispubsubcontroller")
@RequestMapping(value = "/redis")
public class redispubsubcontroller {
@Autowired
private RedisMessagePublisher redisMessagePublisher;
@Autowired
private RedisMessageSubscriber redisMessageSubscriber;
private static Logger logger = LogManager.getLogger(redispubsubcontroller.class);
@RequestMapping(value = "/publisher", method = {RequestMethod.GET})
public ApiResponse getConfig(String message,HttpServletRequest request,
HttpServletResponse response) {
redisMessageSubscriber.setAction(new Action2<Message, byte[]>() {
@Override
public void call(Message message, byte[] bytes) {
ObjectMapper objectMapper = new ObjectMapper();
try {
String result = objectMapper.readValue(message.getBody(), String.class);
logger.info("receive:"+result);
} catch (IOException e) {
e.printStackTrace();
}
}
});
redisMessagePublisher.publish(message);
return new ApiResponse("success","message sent");
}
}
从上面的代码中,你可以知道我订阅了主题并为订阅者设置了一个新的操作:
redisMessageSubscriber.setAction(new Action2<Message, byte[]>() {
@Override
public void call(Message message, byte[] bytes) {
ObjectMapper objectMapper = new ObjectMapper();
try {
String result = objectMapper.readValue(message.getBody(), String.class);
logger.info("receive:"+result);
} catch (IOException e) {
e.printStackTrace();
}
}
});
但是不知道为什么,触发发布者后,订阅者可以获取消息但是hold NULL动作依旧,我创建的 Action 没有传递给它。
有人可以帮忙吗?这个机制有什么问题吗?
====编辑=====
RedisMessageConfig 代码如下:
@Configuration
public class RedisMessageConfig {
@Bean
ChannelTopic topic() {
return new ChannelTopic("useraddresspubsub:queue");
}
@Bean
MessageListenerAdapter messageListener() {
return new MessageListenerAdapter(new RedisMessageSubscriber());
}
@Autowired
private RedisConnectionFactory JedisConnectionFactory;
@Bean
RedisMessageListenerContainer redisContainer() {
final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(JedisConnectionFactory);
container.addMessageListener(messageListener(), topic());
return container;
}
}
====已解决====
最后我按照 mp 的想法解决了这个问题,将 myredismessagesubscriber 稍微更改为 myredismessageconfig 因为流程是从 redismessageconfig 到 redismessagesubscriber,所以在 redismessageconfig 中,我需要先向它注入操作,然后 redismessageconfig 将创建新的 redismessagesubscriber 并保持新创建的动作。代码如下:
@Component
public class MyRedisMessageConfig extends RedisMessageConfig {
private static Logger logger =LogManager.getLogger(MyRedisMessageConfig.class);
public MyRedisMessageConfig() {
super.action = new Action2<Message, byte[]>() {
@Override
public void call(Message message, byte[] bytes) {
String result = new String(message.getBody());
logger.info("received:" + result);
}
};
}
}
截图如下:
这不是 MessageListener
的工作方式。此外,您还创建了共享的可变状态。两个并发调用同时更改 RedisMessageSubscriber
.
的状态
我假设您 运行 遇到可见性问题,因为您在一个线程中设置 action
并且消息接收发生在另一个线程中。
如果每个 MessageListener
需要不同的行为,则创建多个实现该行为的侦听器。
我现在在用java设计redispub/sub系统,遇到了问题。我会告诉你详情:
这里的发布者:
public class RedisMessagePublisher implements MessagePublisher {
public RedisMessagePublisher(StringRedisTemplate redisTemplate,ChannelTopic topic)
{
this.redisTemplate = redisTemplate;
this.topic = topic;
}
private StringRedisTemplate redisTemplate;
private ChannelTopic topic;
@Override
public void publish(String message) {
redisTemplate.convertAndSend(topic.getTopic(), message);
}
}
发布者正确,可以正常工作。
然后让我们转到订阅者 class:
public class RedisMessageSubscriber implements MessageListener {
//action inspect here
private Action2<Message, byte[]> action;
public void setAction(Action2<Message, byte[]> action) {
logger.info("action set");
this.action = action;
}
private static Logger logger = LogManager.getLogger(RedisMessageSubscriber.class);
@Override
public void onMessage(Message message, byte[] bytes) {
logger.info("===> redis subscribe message in <===");
if (action != null)
action.call(message, bytes);
else
logger.info("===> action is null <===");
}
}
在订阅者 class 中,我使用 RxJava 注入 Action 以便我可以更轻松地使用它。
但是问题来了,我发布publisher的消息后,我发现消息可以被运行转发给onMessage方法,日志打印不是我所期望的:
===> redis subscribe message in <===
===> action is null <===
我期望的是,当我发布一条新消息时,订阅者会收到它并且 运行 我创建的操作。
下面我用来触发发布者和订阅者的服务:
@RestController("redispubsubcontroller")
@RequestMapping(value = "/redis")
public class redispubsubcontroller {
@Autowired
private RedisMessagePublisher redisMessagePublisher;
@Autowired
private RedisMessageSubscriber redisMessageSubscriber;
private static Logger logger = LogManager.getLogger(redispubsubcontroller.class);
@RequestMapping(value = "/publisher", method = {RequestMethod.GET})
public ApiResponse getConfig(String message,HttpServletRequest request,
HttpServletResponse response) {
redisMessageSubscriber.setAction(new Action2<Message, byte[]>() {
@Override
public void call(Message message, byte[] bytes) {
ObjectMapper objectMapper = new ObjectMapper();
try {
String result = objectMapper.readValue(message.getBody(), String.class);
logger.info("receive:"+result);
} catch (IOException e) {
e.printStackTrace();
}
}
});
redisMessagePublisher.publish(message);
return new ApiResponse("success","message sent");
}
}
从上面的代码中,你可以知道我订阅了主题并为订阅者设置了一个新的操作:
redisMessageSubscriber.setAction(new Action2<Message, byte[]>() {
@Override
public void call(Message message, byte[] bytes) {
ObjectMapper objectMapper = new ObjectMapper();
try {
String result = objectMapper.readValue(message.getBody(), String.class);
logger.info("receive:"+result);
} catch (IOException e) {
e.printStackTrace();
}
}
});
但是不知道为什么,触发发布者后,订阅者可以获取消息但是hold NULL动作依旧,我创建的 Action 没有传递给它。
有人可以帮忙吗?这个机制有什么问题吗?
====编辑=====
RedisMessageConfig 代码如下:
@Configuration
public class RedisMessageConfig {
@Bean
ChannelTopic topic() {
return new ChannelTopic("useraddresspubsub:queue");
}
@Bean
MessageListenerAdapter messageListener() {
return new MessageListenerAdapter(new RedisMessageSubscriber());
}
@Autowired
private RedisConnectionFactory JedisConnectionFactory;
@Bean
RedisMessageListenerContainer redisContainer() {
final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(JedisConnectionFactory);
container.addMessageListener(messageListener(), topic());
return container;
}
}
====已解决====
最后我按照 mp 的想法解决了这个问题,将 myredismessagesubscriber 稍微更改为 myredismessageconfig 因为流程是从 redismessageconfig 到 redismessagesubscriber,所以在 redismessageconfig 中,我需要先向它注入操作,然后 redismessageconfig 将创建新的 redismessagesubscriber 并保持新创建的动作。代码如下:
@Component
public class MyRedisMessageConfig extends RedisMessageConfig {
private static Logger logger =LogManager.getLogger(MyRedisMessageConfig.class);
public MyRedisMessageConfig() {
super.action = new Action2<Message, byte[]>() {
@Override
public void call(Message message, byte[] bytes) {
String result = new String(message.getBody());
logger.info("received:" + result);
}
};
}
}
截图如下:
这不是 MessageListener
的工作方式。此外,您还创建了共享的可变状态。两个并发调用同时更改 RedisMessageSubscriber
.
我假设您 运行 遇到可见性问题,因为您在一个线程中设置 action
并且消息接收发生在另一个线程中。
如果每个 MessageListener
需要不同的行为,则创建多个实现该行为的侦听器。