内存泄漏传出消息rabbitmq

Memory leak outgoing messages rabbitmq

我有一个 Spring 启动应用程序正在使用 Spring 集成。随着时间的推移,我有增加内存的问题。当我 运行 探查器似乎没有收到确认时,我可以在出站端点中看到问题。当我尝试在本地调试确认处理时,一切看起来都很好。它只出现在发送大量消息的 k8s 环境中(也许这就是本地环境没有问题的原因)。该错误似乎是在 springboot 从 1.6 版升级到 2.3 版(以及所有依赖项)后出现的。这就是为什么我认为它有些配置错误,但我不确定在哪里或为什么。

版本:

配置:

protected RabbitTemplate rabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate();
    rabbitTemplate.setConnectionFactory(connectionFactory);
    rabbitTemplate.setMessageConverter(new ContentTypeDelegatingMessageConverter());
    
    rabbitTemplate.setChannelTransacted(true);

    rabbitTemplate.setMandatory(amqpMandatoryFlag);
    return rabbitTemplate;
}

return IntegrationFlows.from(inputFlowChannel)
                .transform(messageRequestsTransformer())
                .transform(new ObjectToStringTransformer())
                .enrichHeaders(headers)
                .log(LoggingHandler.Level.INFO, "amqpOutboundConnectorLogging",
                        "headers.id + ': outboundAMQPPayload=' + payload")
                .handle(Amqp.outboundAdapter(rabbitTemplate())
                        .confirmCorrelationExpression("payload")
                        .returnChannel(returnChannel)
                        .exchangeName(amqpExchangeTarget)
                        .defaultDeliveryMode(messageDeliveryMode)
                        .headersMappedLast(true)
                );

public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setUri(this.amqpBrokerUri);
    connectionFactory.setChannelCacheSize(this.amqpChannelCacheSize);
    connectionFactory.setCacheMode(CacheMode.CHANNEL);
    connectionFactory.setRequestedHeartBeat(60);
    connectionFactory.setPublisherReturns(true);
    return connectionFactory;
}

分析器屏幕:

如果需要其他配置我会添加。

感谢您的任何建议。

感谢转载;有几个问题;根本原因是使用 returns.

的交易

您不能将发布者确认与交易一起使用,confirms/returns 旨在协同工作;没有任何东西可以触发删除待处理的 returns.

安排对 template.getUnconfirmed() 的调用通常有助于清除未决的 returns,但它不起作用。

内存泄漏的原因有几个,其中最重要的是通道没有增加 nextPublishSeqNo,因此 pendingConfirms 索引被破坏; pendingConfirms 映射用于触发清理,因此我们仍然保留 pendingReturns

解决方法(目前)是使用发布者确认而不是交易(或禁用 [​​=42=]),但我会看看我是否可以在使用交易时想出一个解决方法。

connectionFactory.setPublisherConfirmType(ConfirmType.CORRELATED);

编辑

我发现了问题;问题是您要告诉适配器为每个请求创建关联数据...

.confirmCorrelationExpression("payload")

...这是对模板的指示,用于维护未决的 confirms/returns 映射,这些映射永远不会被清除,因为无法通过交易启用确认。

删除它,适配器将不会为确认配置模板。

https://github.com/spring-projects/spring-amqp/issues/1439