Spring 从 RabbitMQ 集成迁移到 Redis 以共享应用程序事件
Spring integration migration to Redis from RabbitMQ to share application events
我们正在将我们的微服务应用程序从 RabbitMQ 迁移到 Redis。
这是我们的服务激活器
@ServiceActivator(inputChannel = ApplicationEventChannelNames.REMOTE_CHANNEL)
public void handleApplicationEvent(@Header(value = ApplicationEventHeaders.APPLICATION_EVENT) final ApplicationEvent event,
@Payload Object message) {
...
}
最初我们遇到了一个问题,我们在 SimpleMessageConverter
中丢失了应用程序事件。我们通过实现 CustomRedisMessageConverter
并将应用程序事件放入 fromMessage
方法中的有效载荷并从有效载荷中检索它并在 [=18= 中创建一个带有应用程序事件的新消息 headers 来解决它]方法。
@Override
public Object fromMessage(Message<?> message, Class<?> targetClass) {
if (message.getHeaders().get(ApplicationEventHeaders.APPLICATION_EVENT) != null) {
Map<String, Object> map = new HashMap<>();
map.put("headers", ((ApplicationEvent) message.getHeaders().get(ApplicationEventHeaders.APPLICATION_EVENT)).getName());
map.put("payload", message.getPayload());
GenericMessage<Map<String, Object>> msg = new GenericMessage<>(map, message.getHeaders());
return super.fromMessage(msg, targetClass);
}
return super.fromMessage(message, targetClass);
}
@Override
public Message<?> toMessage(Object payload, MessageHeaders headers) {
try {
final Map<String, ?> message = new ObjectMapper().readValue((String) payload, new TypeReference<Map<String, ?>>() {});
if (message.get("headers") != null) {
final Map<String, Object> messageHeaders = new HashMap<>(headers);
messageHeaders.put(ApplicationEventHeaders.APPLICATION_EVENT, new ApplicationEvent((String) message.get("headers")));
return super.toMessage(message.get("payload"), new MessageHeaders(messageHeaders));
}
} catch (JsonProcessingException exception) {
/* Intentionally left blank */
}
return super.toMessage(payload, headers);
}
我们想知道是否有更好的方法来做到这一点?
最后,服务激活器中的有效载荷是 LinkedHashMap
,但我们希望它是 object。使用 RabbitMQ 可以解决这个问题。
有什么方法可以在 Redis 中做同样的事情吗?或者我们是否使用 headers 来跟踪有效负载的类型并手动将它们转换为 object?
更新 - REDIS 配置
@Bean
public RedisInboundChannelAdapter applicationEventInboundChannelAdapter(@Value(value = "${com.xxx.xxx.xxx.integration.spring.topic}") String topic,
MessageChannel applicationEventRemoteChannel,
RedisConnectionFactory connectionFactory) {
final RedisInboundChannelAdapter inboundChannelAdapter = new RedisInboundChannelAdapter(connectionFactory);
inboundChannelAdapter.setTopics(topic);
inboundChannelAdapter.setOutputChannel(applicationEventRemoteChannel);
inboundChannelAdapter.setErrorChannel(errorChannel());
inboundChannelAdapter.setMessageConverter(new CustomRedisMessageConverter());
return inboundChannelAdapter;
}
@ServiceActivator(inputChannel = "errorChannel")
public void processError(MessageHandlingException exception) {
try {
logger.error(
"Could not process {}, got exception: {}",
exception.getFailedMessage().getPayload(),
exception.getMessage());
logger.error(
ExceptionUtils.readStackTrace(exception));
} catch (Throwable throwable) {
logger.error(
"Got {} during processing with message: {} ",
MessageHandlingException.class.getSimpleName(),
exception);
}
}
@Bean
@ServiceActivator(inputChannel = ApplicationEventChannelNames.LOCAL_CHANNEL)
public RedisPublishingMessageHandler redisPublishingMessageHandler(@Value(value = "${com.xxx.xxx.xxx.integration.spring.topic}") String topic,
RedisConnectionFactory redisConnectionFactory) {
final RedisPublishingMessageHandler redisPublishingMessageHandler = new RedisPublishingMessageHandler(redisConnectionFactory);
redisPublishingMessageHandler.setTopic(topic);
redisPublishingMessageHandler.setSerializer(new Jackson2JsonRedisSerializer<>(String.class));
redisPublishingMessageHandler.setMessageConverter(new CusomRedisMessageConverter());
return redisPublishingMessageHandler;
}
/*
* MessageChannel
*/
@Bean
public MessageChannel errorChannel() {
return new DirectChannel();
}
Redis不支持headers,所以你必须将它们嵌入到body中。请参阅 EmbeddedJsonHeadersMessageMapper
可以在两侧提供给 org.springframework.integration.support.converter.SimpleMessageConverter
。
我们正在将我们的微服务应用程序从 RabbitMQ 迁移到 Redis。
这是我们的服务激活器
@ServiceActivator(inputChannel = ApplicationEventChannelNames.REMOTE_CHANNEL)
public void handleApplicationEvent(@Header(value = ApplicationEventHeaders.APPLICATION_EVENT) final ApplicationEvent event,
@Payload Object message) {
...
}
最初我们遇到了一个问题,我们在 SimpleMessageConverter
中丢失了应用程序事件。我们通过实现 CustomRedisMessageConverter
并将应用程序事件放入 fromMessage
方法中的有效载荷并从有效载荷中检索它并在 [=18= 中创建一个带有应用程序事件的新消息 headers 来解决它]方法。
@Override
public Object fromMessage(Message<?> message, Class<?> targetClass) {
if (message.getHeaders().get(ApplicationEventHeaders.APPLICATION_EVENT) != null) {
Map<String, Object> map = new HashMap<>();
map.put("headers", ((ApplicationEvent) message.getHeaders().get(ApplicationEventHeaders.APPLICATION_EVENT)).getName());
map.put("payload", message.getPayload());
GenericMessage<Map<String, Object>> msg = new GenericMessage<>(map, message.getHeaders());
return super.fromMessage(msg, targetClass);
}
return super.fromMessage(message, targetClass);
}
@Override
public Message<?> toMessage(Object payload, MessageHeaders headers) {
try {
final Map<String, ?> message = new ObjectMapper().readValue((String) payload, new TypeReference<Map<String, ?>>() {});
if (message.get("headers") != null) {
final Map<String, Object> messageHeaders = new HashMap<>(headers);
messageHeaders.put(ApplicationEventHeaders.APPLICATION_EVENT, new ApplicationEvent((String) message.get("headers")));
return super.toMessage(message.get("payload"), new MessageHeaders(messageHeaders));
}
} catch (JsonProcessingException exception) {
/* Intentionally left blank */
}
return super.toMessage(payload, headers);
}
我们想知道是否有更好的方法来做到这一点?
最后,服务激活器中的有效载荷是 LinkedHashMap
,但我们希望它是 object。使用 RabbitMQ 可以解决这个问题。
有什么方法可以在 Redis 中做同样的事情吗?或者我们是否使用 headers 来跟踪有效负载的类型并手动将它们转换为 object?
更新 - REDIS 配置
@Bean
public RedisInboundChannelAdapter applicationEventInboundChannelAdapter(@Value(value = "${com.xxx.xxx.xxx.integration.spring.topic}") String topic,
MessageChannel applicationEventRemoteChannel,
RedisConnectionFactory connectionFactory) {
final RedisInboundChannelAdapter inboundChannelAdapter = new RedisInboundChannelAdapter(connectionFactory);
inboundChannelAdapter.setTopics(topic);
inboundChannelAdapter.setOutputChannel(applicationEventRemoteChannel);
inboundChannelAdapter.setErrorChannel(errorChannel());
inboundChannelAdapter.setMessageConverter(new CustomRedisMessageConverter());
return inboundChannelAdapter;
}
@ServiceActivator(inputChannel = "errorChannel")
public void processError(MessageHandlingException exception) {
try {
logger.error(
"Could not process {}, got exception: {}",
exception.getFailedMessage().getPayload(),
exception.getMessage());
logger.error(
ExceptionUtils.readStackTrace(exception));
} catch (Throwable throwable) {
logger.error(
"Got {} during processing with message: {} ",
MessageHandlingException.class.getSimpleName(),
exception);
}
}
@Bean
@ServiceActivator(inputChannel = ApplicationEventChannelNames.LOCAL_CHANNEL)
public RedisPublishingMessageHandler redisPublishingMessageHandler(@Value(value = "${com.xxx.xxx.xxx.integration.spring.topic}") String topic,
RedisConnectionFactory redisConnectionFactory) {
final RedisPublishingMessageHandler redisPublishingMessageHandler = new RedisPublishingMessageHandler(redisConnectionFactory);
redisPublishingMessageHandler.setTopic(topic);
redisPublishingMessageHandler.setSerializer(new Jackson2JsonRedisSerializer<>(String.class));
redisPublishingMessageHandler.setMessageConverter(new CusomRedisMessageConverter());
return redisPublishingMessageHandler;
}
/*
* MessageChannel
*/
@Bean
public MessageChannel errorChannel() {
return new DirectChannel();
}
Redis不支持headers,所以你必须将它们嵌入到body中。请参阅 EmbeddedJsonHeadersMessageMapper
可以在两侧提供给 org.springframework.integration.support.converter.SimpleMessageConverter
。