ConcurrentDictionary 重复错误中的 ConcurrentQueue

ConcurrentQueue in a ConcurrentDictionary Duplicate Error

我有一个线程处理 每 10 秒 收到的消息,并让另一个线程 每分钟 将这些消息写入数据库。 每条消息都有一个不同的发件人,在我的例子中名为 serialNumber

因此,我创建了一个如下所示的 ConcurrentDictionary。

public ConcurrentDictionary<string, ConcurrentQueue<PacketModel>> _dicAllPackets;

字典的关键字是serialNumber,值为1分钟消息的集合。 我想要收集一分钟数据的原因是每 10 秒访问一次数据库,而不是每分钟访问一次,这样我可以将流程减少 1/6 倍。

public class ShotManager
    {
        private const int SLEEP_THREAD_FOR_FILE_LIST_DB_SHOOTER = 25000;
        private bool ACTIVE_FILE_DB_SHOOT_THREAD = false;
        private List<Devices> _devices = new List<Devices>();
        public ConcurrentDictionary<string, ConcurrentQueue<PacketModel>> _dicAllPackets;

        public ShotManager()
        {
            ACTIVE_FILE_DB_SHOOT_THREAD = Utility.GetAppSettings("AppConfig", "0", "ACTIVE_LIST_DB_SHOOT") == "1";
            init();
        }
        private void init()
        {
            using (iotemplaridbContext dbContext = new iotemplaridbContext())
                _devices = (from d in dbContext.Devices select d).ToList();

            if (_dicAllPackets is null)
                _dicAllPackets = new ConcurrentDictionary<string, ConcurrentQueue<PacketModel>>();

            foreach (var device in _devices)
            {
                if(!_dicAllPackets.ContainsKey(device.SerialNumber))
                    _dicAllPackets.TryAdd(device.SerialNumber, new ConcurrentQueue<PacketModel> { });
            }
        }
        public void Spinner()
        {
            while (ACTIVE_FILE_DB_SHOOT_THREAD)
            {
                try
                {
                    Parallel.ForEach(_dicAllPackets, devicePacket =>
                    {
                        Thread.Sleep(100);
                        readAndShot(devicePacket);
                    });
                    Thread.Sleep(SLEEP_THREAD_FOR_FILE_LIST_DB_SHOOTER);
                    //init();
                }
                catch (Exception ex)
                {
                    //init();
                    tLogger.EXC("Spinner exception for write...", ex);
                }
            }
        }
        public void EnqueueObjectToQueue(string serialNumber, PacketModel model)
        {
            if (_dicAllPackets != null)
            {
                if (!_dicAllPackets.ContainsKey(serialNumber))
                    _dicAllPackets.TryAdd(serialNumber, new ConcurrentQueue<PacketModel> { });
                else
                    _dicAllPackets[serialNumber].Enqueue(model);
            }
        }
        private void readAndShot(KeyValuePair<string, ConcurrentQueue<PacketModel>> keyValuePair)
        {
            StringBuilder sb = new StringBuilder();
            if (keyValuePair.Value.Count() <= 0)
            {
                return;
            }
            sb.AppendLine($"INSERT INTO ......) VALUES(");
//the reason why I don't use while(TryDequeue(out ..)){..} is there's constantly enqueue to this dictionary, so the thread will be occupied with a single device for so long
            for (int i = 0; i < 10; i++)
            {
                keyValuePair.Value.TryDequeue(out PacketModel packet);
                if (packet != null)
                {
                    /*
                     *** do something and fill the sb...
                     */
                }
                else
                {
                    Console.WriteLine("No packet found! For Device: " + keyValuePair.Key);
                    break;
                }
            }
            insertIntoDB(sb.ToString()[..(sb.Length - 5)] + ";");
        }
    }

EnqueueObjectToQueue 来电者来自另一个 class,如下所示。

private void packetToDictionary(string serialNumber, string jsonPacket, string messageTimeStamp)
{
            PacketModel model = new PacketModel {
                MachineData = jsonPacket, 
                DataInsertedAt = messageTimeStamp
            };
            _shotManager.EnqueueObjectToQueue(serialNumber, model);
}

我调用上述函数的方式来自于处理函数本身。

private void messageReceiveHandler(object sender, MessageReceviedEventArgs e){
   //do something...parse from e and call the func
   string jsonPacket = ""; //something parsed from e 
   string serialNumber = ""; //something parsed from e
   string message_timestamp = DateTime.Now().ToString("yyyy-MM-dd HH:mm:ss");
   ThreadPool.QueueUserWorkItem(state => packetToDictionary(serialNumber, str, message_timestamp));
}

问题是有时一些数据包在错误的 serialNumber 下排队或重复自身(重复条目)。

像这样在 ConcurrentDictionary 中使用 ConcurrentQueue 是不是很聪明?

不,使用带有嵌套 ConcurrentQueueConcurrentDictionary 作为值不是一个好主意。不可能自动更新这个结构。以此为例:

if (!_dicAllPackets.ContainsKey(serialNumber))
    _dicAllPackets.TryAdd(serialNumber, new ConcurrentQueue<PacketModel> { });
else
    _dicAllPackets[serialNumber].Enqueue(model);

这段代码充满了竞态条件。 运行 此代码的线程可以在 ContainsKeyTryAdd[] 索引器和 Enqueue 调用之间的任何点被另一个线程拦截,改变结构的状态,并使当前线程工作的正确性所基于的条件无效。

A ConcurrentDictionary 当你有一个包含不可变值的简单 Dictionary 时,你想同时使用它,并且在每次访问周围使用 lock 可能是个好主意引起重大争论。您可以在此处阅读更多相关信息:

我的建议是切换到简单的 Dictionary<string, Queue<PacketModel>>,并与 lock 同步。如果你很小心并且在持有锁时避免做任何无关紧要的事情,那么锁将被释放得如此之快以至于其他线程很少会被它阻塞。使用锁只是为了保护结构的特定条目的读取和更新,没有别的。

替代设计

A ConcurrentDictionary<string, Queue<PacketModel>> 结构可能是一个不错的选择,前提是您从未从字典中删除队列。否则仍然存在 space 的竞争条件。您应该专门使用 GetOrAdd 方法在字典中以原子方式获取或添加队列,并且在对其进行任何操作(读取或写入)之前始终使用 queue 本身作为储物柜:

Queue<PacketModel> queue = _dicAllPackets
    .GetOrAdd(serialNumber, _ => new Queue<PacketModel>());
lock (queue)
{
    queue.Enqueue(model);
}

也可以使用 ConcurrentDictionary<string, ImmutableQueue<PacketModel>>,因为在这种情况下 ConcurrentDictionary 的值是不可变的,您不需要 lock 任何东西。您需要始终使用 AddOrUpdate 方法,以便通过一次调用更新字典,作为原子操作。

_dicAllPackets.AddOrUpdate
(
    serialNumber,
    key => ImmutableQueue.Create<PacketModel>(model),
    (key, queue) => queue.Enqueue(model)
);

updateValueFactory 委托中的 queue.Enqueue(model) 调用不会改变队列。相反,它会创建一个新的 ImmutableQueue<PacketModel> 并丢弃前一个。不可变集合通常不是很有效。但是,如果您的目标是以增加每个线程必须完成的工作为代价最小化线程之间的争用,那么您可能会发现它们很有用。