RabbitMQ returns 相同的消息一次又一次

RabbitMQ returns same message again and again

我尝试对一个相当基本的场景进行单元测试 - 具有 2 个工作人员和 1 个发布者场景的工作队列,但它不断从队列中一遍又一遍地返回相同的消息。

测试中的以下代码只是将 1 到 100 条消息放入队列,然后 2 个消费者将它们吃掉。问题是他们一直只收到消息 1 和 2。我试图将确认分成一个方法,因为在我的应用程序中,消息需要时间来处理(注释方法 Confirm ) - 然后它抛出一个异常,令牌是未知的:

The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text="PRECONDITION_FAILED - unknown delivery tag 1", classId=60, methodId=80, cause=

看来确认不知何故被打破了。我试图将其关闭 - 也不走运。

Class:

using System;
using System.Text;
using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Backend.MQ.OCR
{
    public class BatchQueue : QueueBase<BatchMessage>
    {
        private readonly IModel _channel;
        private const string QPrefix = "ocrbatches_";
        private readonly QueueingBasicConsumer _consumer;
        private ulong _latesttoken = ulong.MaxValue;
        private readonly string _jobid;
        public BatchQueue(string connectionString, String jobid):
            base(connectionString)
        {
            _jobid = jobid;
            var factory = new ConnectionFactory()
            {
                HostName = connectionString
            };
            var connection = factory.CreateConnection();
            _channel = connection.CreateModel();
            _channel.QueueDeclare(Name, true, false, false, null);
            //binding consumers
            _channel.BasicQos(0, 1, false);
            _consumer = new QueueingBasicConsumer(_channel);
            _channel.BasicConsume(Name, false, _consumer);
        }

        public override void Publish(BatchMessage msg)
        {
            var message = JsonConvert.SerializeObject(msg);
            var body = Encoding.UTF8.GetBytes(message);
            var properties = _channel.CreateBasicProperties();
            properties.SetPersistent(true);
            _channel.BasicPublish("", Name, properties, body);
#if DEBUG
            System.Diagnostics.Trace.WriteLine("[x] Sent task:" + msg);
#endif 
        }

        private string Name
        {
            get { return QPrefix + _jobid; }
        } 

        public override BatchMessage Receive()
        {
            var ea =
                    (BasicDeliverEventArgs)_consumer.Queue.Dequeue();

            var body = ea.Body;
            _channel.BasicAck(ea.DeliveryTag, false);
            return JsonConvert.DeserializeObject<BatchMessage>(Encoding.UTF8.GetString(body));
        }


        public override void Confirm()
        {
            //if (_latesttoken < ulong.MaxValue) _channel.BasicAck(_latesttoken, false);
        }
    }
}

单元测试:

#if NUNIT
using TestClass = NUnit.Framework.TestFixtureAttribute;
using TestMethod = NUnit.Framework.TestAttribute;
using TestCleanup = NUnit.Framework.TearDownAttribute;
using TestInitialize = NUnit.Framework.SetUpAttribute;
using ClassCleanup = NUnit.Framework.TestFixtureTearDownAttribute;
using ClassInitialize = NUnit.Framework.TestFixtureSetUpAttribute;
#else
#endif
using System.Threading.Tasks;
using System.Threading;
using System;
using System.Collections.Generic;
using Backend.MQ.OCR;
using Microsoft.VisualStudio.TestTools.UnitTesting;
#if NUNIT
using MAssert = NUnit.Framework.Assert;
#else
using MAssert = Microsoft.VisualStudio.TestTools.UnitTesting.Assert;
#endif

namespace MQ.Test
{
    [TestClass]
    public class BatchQueueTest
    {
        [TestMethod]
        public void Concurrencytest()
        {
            var batchname = Guid.NewGuid().ToString();
            var queue = new BatchQueue("localhost", batchname);
            var tasks = new List<Task>();
            var counter = 0;
            for (int i = 0; i < 100; i++)
            {
                queue.Publish(new BatchMessage()
                {
                    Files = new List<string>() { i.ToString() }
                });
            }
            for (int i = 0; i < 2; i++)
            {
                var task = Task.Factory.StartNew(() =>
                {
                    var q = new BatchQueue("localhost", batchname);
                    var res = q.Receive();
                    while (res != null)
                    {
                        System.Diagnostics.Trace.WriteLine(res.Files[0]);
                        q.Confirm();
                        Interlocked.Increment(ref counter);
                    }
                });
                tasks.Add(task);
            }
            var ok = Task.WaitAll(tasks.ToArray(), TimeSpan.FromSeconds(30));
            MAssert.IsTrue(ok, "Tasks didnt complete in time");
            MAssert.AreEqual(counter, 100, "Not all messages have been processed");

        }
    }
}

您的单元测试启动了两个任务。在 while 循环之前您收到一条消息,但您在 while 循环内不断确认相同的消息:

var q = new BatchQueue("localhost", batchname);
//Receive message 1 or 2
var res = q.Receive();
while (res != null)
{   //Infinite loop
    System.Diagnostics.Trace.WriteLine(res.Files[0]);
    q.Confirm();
    Interlocked.Increment(ref counter);
}

尝试将var res = q.Receive();放入循环