Redisson异步处理消息
Redisson async procesing messages
我正在尝试将 Redisson 功能应用于我的项目作为消息代理,但我有一个问题。是否可以将 Redisson 推送到异步接收到的消息之前?我创建了一个小示例,从不同的 URL 发送了 4 条消息。我预计,Redisson 会异步处理它们,但它是一个接一个地完成的。
这里的实现:
public class RedisListenerServiceImpl implements MessageListener<String> {
private static final Logger log = LoggerFactory.getLogger(RedisListenerServiceImpl.class);
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void onMessage(CharSequence channel, String stringMsg) {
log.info("Message received: {}", stringMsg);
MessageDto msg;
try {
msg = objectMapper.readValue(stringMsg, MessageDto.class);
} catch (final IOException e) {
log.error("Unable to deserialize message: {}", e.getMessage(), e);
return;
}
try {
//Do my stuff
} catch (Exception e) {
log.error("Unable to get service from factory: {}", e.getMessage(), e);
}
}
}
和配置:
@Configuration
public class RedisListenerConfig {
@Autowired
public RedisListenerConfig(RedissonClient redisClient,
MessageListener redisListenerService,
@Value("${redis.sub.key}") String redisSubKey) {
RTopic subscribeTopic = redisClient.getTopic(redisSubKey);
subscribeTopic.addListenerAsync(String.class, redisListenerService);
}
}
这是预期的行为。如果你想让你的消息在ListeneronMessage()方法被触发的时候被并发处理,只需要使用线程池来处理它。
由于 Redisson 不知道您要使用多少个线程来处理触发的事件,因此它将实现细节留给您。
public class RedisListenerServiceImpl implements MessageListener<String> {
private static final Logger log = LoggerFactory.getLogger(RedisListenerServiceImpl.class);
private final ObjectMapper objectMapper = new ObjectMapper();
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
@Override
public void onMessage(CharSequence channel, String stringMsg) {
log.info("Message received: {}", stringMsg);
MessageDto msg;
try {
msg = objectMapper.readValue(stringMsg, MessageDto.class);
executorService.submit(()->{
System.out.println("do something with message: "+msg);
});
} catch (final IOException e) {
log.error("Unable to deserialize message: {}", e.getMessage(), e);
return;
}
try {
//Do my stuff
} catch (Exception e) {
log.error("Unable to get service from factory: {}", e.getMessage(), e);
}
}
我正在尝试将 Redisson 功能应用于我的项目作为消息代理,但我有一个问题。是否可以将 Redisson 推送到异步接收到的消息之前?我创建了一个小示例,从不同的 URL 发送了 4 条消息。我预计,Redisson 会异步处理它们,但它是一个接一个地完成的。 这里的实现:
public class RedisListenerServiceImpl implements MessageListener<String> {
private static final Logger log = LoggerFactory.getLogger(RedisListenerServiceImpl.class);
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void onMessage(CharSequence channel, String stringMsg) {
log.info("Message received: {}", stringMsg);
MessageDto msg;
try {
msg = objectMapper.readValue(stringMsg, MessageDto.class);
} catch (final IOException e) {
log.error("Unable to deserialize message: {}", e.getMessage(), e);
return;
}
try {
//Do my stuff
} catch (Exception e) {
log.error("Unable to get service from factory: {}", e.getMessage(), e);
}
}
}
和配置:
@Configuration
public class RedisListenerConfig {
@Autowired
public RedisListenerConfig(RedissonClient redisClient,
MessageListener redisListenerService,
@Value("${redis.sub.key}") String redisSubKey) {
RTopic subscribeTopic = redisClient.getTopic(redisSubKey);
subscribeTopic.addListenerAsync(String.class, redisListenerService);
}
}
这是预期的行为。如果你想让你的消息在ListeneronMessage()方法被触发的时候被并发处理,只需要使用线程池来处理它。
由于 Redisson 不知道您要使用多少个线程来处理触发的事件,因此它将实现细节留给您。
public class RedisListenerServiceImpl implements MessageListener<String> {
private static final Logger log = LoggerFactory.getLogger(RedisListenerServiceImpl.class);
private final ObjectMapper objectMapper = new ObjectMapper();
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
@Override
public void onMessage(CharSequence channel, String stringMsg) {
log.info("Message received: {}", stringMsg);
MessageDto msg;
try {
msg = objectMapper.readValue(stringMsg, MessageDto.class);
executorService.submit(()->{
System.out.println("do something with message: "+msg);
});
} catch (final IOException e) {
log.error("Unable to deserialize message: {}", e.getMessage(), e);
return;
}
try {
//Do my stuff
} catch (Exception e) {
log.error("Unable to get service from factory: {}", e.getMessage(), e);
}
}