RabbitMQ 持久队列不工作(RPC-Server,RPC-Client)

RabbitMQ durable queue does not work (RPC-Server, RPC-Client)

我想知道为什么我的 RabbitMQ RPC 客户端在重启后总是处理死消息。 _channel.QueueDeclare(queue, false, false, false, null); 应该禁用缓冲区。如果我在 RPC 客户端中超载 QueueDeclare,我将无法连接到服务器。这里有什么问题吗?知道如何解决这个问题吗?


RPC 服务器

new Thread(() =>
{
    var factory = new ConnectionFactory { HostName = _hostname };
    if (_port > 0)
        factory.Port = _port;
    _connection = factory.CreateConnection();
    _channel = _connection.CreateModel();

    _channel.QueueDeclare(queue, false, false, false, null);
    _channel.BasicQos(0, 1, false);
    var consumer = new QueueingBasicConsumer(_channel);
    _channel.BasicConsume(queue, false, consumer);
    IsRunning = true;
    while (IsRunning)
    {
        BasicDeliverEventArgs ea;
        try {
            ea = consumer.Queue.Dequeue();
        }
        catch (Exception ex) {
            IsRunning = false;
        }
        var body = ea.Body;
        var props = ea.BasicProperties;
        var replyProps = _channel.CreateBasicProperties();
        replyProps.CorrelationId = props.CorrelationId;

        var xmlRequest = Encoding.UTF8.GetString(body);

        var messageRequest = XmlSerializer.DeserializeObject(xmlRequest, typeof(Message)) as Message;
        var messageResponse = handler(messageRequest);

        _channel.BasicPublish("", props.ReplyTo, replyProps,
                                messageResponse);
        _channel.BasicAck(ea.DeliveryTag, false);
    }
}).Start();

RPC 客户端

public void Start()
{
    if (IsRunning)
        return;
    var factory = new ConnectionFactory { 
        HostName = _hostname,
        Endpoint = _port <= 0 ? new AmqpTcpEndpoint(_endpoint) 
                              : new AmqpTcpEndpoint(_endpoint, _port)
    };
    _connection = factory.CreateConnection();
    _channel = _connection.CreateModel();
    _replyQueueName = _channel.QueueDeclare(); // Do not connect any more
    _consumer = new QueueingBasicConsumer(_channel);
    _channel.BasicConsume(_replyQueueName, true, _consumer);
    IsRunning = true;
}

public Message Call(Message message)
{
    if (!IsRunning)
        throw new Exception("Connection is not open.");
    var corrId = Guid.NewGuid().ToString().Replace("-", "");
    var props = _channel.CreateBasicProperties();
    props.ReplyTo = _replyQueueName;
    props.CorrelationId = corrId;

    if (!String.IsNullOrEmpty(_application))
        props.AppId = _application;

    message.InitializeProperties(_hostname, _nodeId, _uniqueId, props);

    var messageBytes = Encoding.UTF8.GetBytes(XmlSerializer.ConvertToString(message));
    _channel.BasicPublish("", _queue, props, messageBytes);

    try 
    {
        while (IsRunning)
        {
            var ea = _consumer.Queue.Dequeue();
            if (ea.BasicProperties.CorrelationId == corrId)
            {
                var xmlResponse = Encoding.UTF8.GetString(ea.Body);
                try
                {
                    return XmlSerializer.DeserializeObject(xmlResponse, typeof(Message)) as Message;
                }
                catch(Exception ex)
                {
                    IsRunning = false;
                    return null;
                }
            }
        }
    }
    catch (EndOfStreamException ex)
    {
        IsRunning = false;
        return null;
    }
    return null;
}

尝试在您的 RPC 客户端代码中将 DeliveryMode 属性 设置为非持久 (1),如下所示:

public Message Call(Message message)
{
   ...
   var props = _channel.CreateBasicProperties();
   props.DeliveryMode = 1; //you might want to do this in your RPC-Server as well
   ...
}

AMQP Model Explained 包含非常有用的资源,例如解释如何处理最终进入死信队列的消息。

文档中关于队列持久性的另一个有用说明:

Durable queues are persisted to disk and thus survive broker restarts. Queues that are not durable are called transient. Not all scenarios and use cases mandate queues to be durable.

Durability of a queue does not make messages that are routed to that queue durable. If broker is taken down and then brought back up, durable queue will be re-declared during broker startup, however, only persistent messages will be recovered.

请注意,它谈论的是 代理重启 而不是发布者或消费者重启。