如何处理多个源队列消息类型

How to handle multiple source queue message types

总的来说,我对并行编程有些陌生,我正在尝试了解如何处理我的系统等待多种消息类型的情况。这个问题主要是为了简单地检查我的理解。我的特定系统是使用 Microsoft Concurrency Runtime (C++) 实现的。

遇到了另外两个 SO 问题 -

Google Protocol Buffers, how to handle multiple message-Types?

msmq multiple message types in a single queue

似乎一般的方法是创建一个通用消息来表示消息的所有不同子类型。公共消息中包含一些元数据,用于确定特定的子类型消息。然后所有消息都通过一个队列传输;读取时,根据消息元数据确定特定的子类型并执行适当的操作。

但是,在我的特定应用程序中,我宁愿能够使用专门用于每种消息类型的多个队列,然后等待所有队列,处理最先获得消息的队列。但是,我可以看到这种方法导致的几个明显的读取竞争条件 -

1) 似乎不可能以相同的顺序为不同类型的消息提供服务,因为它们是单独排队的。问题。

2) 我可以想象两条消息同时可用。通过任何调度机制,一个被读取,另一个被……丢弃。失败。

仅凭这两个问题,我就很愿意放弃后一种方法。但是在我烧毁我的小思想实验之前,有没有人可以解决上述两个问题来保护多源队列?还是确实只能使用一个通用消息格式的队列?

如果你真的想制作多个队列并拥有总订单,那么你将需要一个票务系统,可能会使用 atomic int(所有代码都未经测试)

int GetTicket() {
  static std::atomic<int> myTicket { 0 };

  return myTicket++;
}

然后按正确顺序将它们从队列中剥离。

但是使用 std::function<WhatEver> 队列更容易,可以是无锁队列,或者如果你想要简单的互斥保护队列,像这样

#include <thread>             // std::thread
#include <mutex>              // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable

std::mutex mtx;
std::condition_variable cv;
bool ready = false;

void EnQueue(std::function<WhatEver> func) {
  std::unique_lock<std::mutex> lck(mtx);
  myQueue.push_back(std::move(func));
  ready = true;

  cv.notify_one();
}



void print_id (int id) {
  while(true) {
    std::unique_lock<std::mutex> lck(mtx);
    while (!ready) cv.wait(lck);

    auto func = myQueue.front();
    myQueue.pop();

    func();
  }
}