Spring 从 RabbitMQ 集成迁移到 Redis 以共享应用程序事件

Spring integration migration to Redis from RabbitMQ to share application events

我们正在将我们的微服务应用程序从 RabbitMQ 迁移到 Redis。

这是我们的服务激活器

@ServiceActivator(inputChannel = ApplicationEventChannelNames.REMOTE_CHANNEL)
public void handleApplicationEvent(@Header(value = ApplicationEventHeaders.APPLICATION_EVENT) final ApplicationEvent event,
                                   @Payload Object message) {
...
}

最初我们遇到了一个问题,我们在 SimpleMessageConverter 中丢失了应用程序事件。我们通过实现 CustomRedisMessageConverter 并将应用程序事件放入 fromMessage 方法中的有效载荷并从有效载荷中检索它并在 [=18= 中创建一个带有应用程序事件的新消息 headers 来解决它]方法。

    @Override
    public Object fromMessage(Message<?> message, Class<?> targetClass) {
        if (message.getHeaders().get(ApplicationEventHeaders.APPLICATION_EVENT) != null) {

            Map<String, Object> map = new HashMap<>();

            map.put("headers", ((ApplicationEvent) message.getHeaders().get(ApplicationEventHeaders.APPLICATION_EVENT)).getName());
            map.put("payload", message.getPayload());

            GenericMessage<Map<String, Object>> msg = new GenericMessage<>(map, message.getHeaders());

            return super.fromMessage(msg, targetClass);
        }
        return super.fromMessage(message, targetClass);
    }

    @Override
    public Message<?> toMessage(Object payload, MessageHeaders headers) {

        try {
            final Map<String, ?> message = new ObjectMapper().readValue((String) payload, new TypeReference<Map<String, ?>>() {});

            if (message.get("headers") != null) {
                final Map<String, Object> messageHeaders = new HashMap<>(headers);

                messageHeaders.put(ApplicationEventHeaders.APPLICATION_EVENT, new ApplicationEvent((String) message.get("headers")));

                return super.toMessage(message.get("payload"), new MessageHeaders(messageHeaders));
            }
        } catch (JsonProcessingException exception) {
            /* Intentionally left blank */
        }

        return super.toMessage(payload, headers);
    }

我们想知道是否有更好的方法来做到这一点?

最后,服务激活器中的有效载荷是 LinkedHashMap,但我们希望它是 object。使用 RabbitMQ 可以解决这个问题。

有什么方法可以在 Redis 中做同样的事情吗?或者我们是否使用 headers 来跟踪有效负载的类型并手动将它们转换为 object?

更新 - REDIS 配置

    @Bean
    public RedisInboundChannelAdapter applicationEventInboundChannelAdapter(@Value(value = "${com.xxx.xxx.xxx.integration.spring.topic}") String topic,
                                                                            MessageChannel applicationEventRemoteChannel,
                                                                            RedisConnectionFactory connectionFactory) {



        final RedisInboundChannelAdapter inboundChannelAdapter = new RedisInboundChannelAdapter(connectionFactory);
        inboundChannelAdapter.setTopics(topic);
        inboundChannelAdapter.setOutputChannel(applicationEventRemoteChannel);
        inboundChannelAdapter.setErrorChannel(errorChannel());
        inboundChannelAdapter.setMessageConverter(new CustomRedisMessageConverter());

        return inboundChannelAdapter;
    }

    @ServiceActivator(inputChannel = "errorChannel")
    public void processError(MessageHandlingException exception) {

        try {

            logger.error(
                    "Could not process {}, got exception: {}",
                    exception.getFailedMessage().getPayload(),
                    exception.getMessage());

            logger.error(
                    ExceptionUtils.readStackTrace(exception));

        } catch (Throwable throwable) {

            logger.error(
                    "Got {} during processing with message: {} ",
                    MessageHandlingException.class.getSimpleName(),
                    exception);
        }
    }

    @Bean
    @ServiceActivator(inputChannel = ApplicationEventChannelNames.LOCAL_CHANNEL)
    public RedisPublishingMessageHandler redisPublishingMessageHandler(@Value(value = "${com.xxx.xxx.xxx.integration.spring.topic}") String topic,
                                                                       RedisConnectionFactory redisConnectionFactory) {

        final RedisPublishingMessageHandler redisPublishingMessageHandler = new RedisPublishingMessageHandler(redisConnectionFactory);


        redisPublishingMessageHandler.setTopic(topic);
        redisPublishingMessageHandler.setSerializer(new Jackson2JsonRedisSerializer<>(String.class));
        redisPublishingMessageHandler.setMessageConverter(new CusomRedisMessageConverter());
        return redisPublishingMessageHandler;
    }

    /*
     * MessageChannel
     */

    @Bean
    public MessageChannel errorChannel() {
        return new DirectChannel();
    }

Redis不支持headers,所以你必须将它们嵌入到body中。请参阅 EmbeddedJsonHeadersMessageMapper 可以在两侧提供给 org.springframework.integration.support.converter.SimpleMessageConverter