C ++中的无锁多生产者多消费者
Lock-free multiple producer multiple consumer in C++
我必须用 C++ 编写一个多生产者-消费者系统,但我在尝试将模型的每个部分(具有正确缓冲区的线程)放在一起时迷路了。该模型的基本功能是:我有一个执行函数的初始线程。这个 returned 结果需要放入不确定数量的缓冲区中,因为函数处理的每个元素都是不同的,需要在单个线程中处理。然后,随着buffer中存储的数据,另外n
个线程需要获取这个buffer中的数据来做另外一个功能,这个的return需要再次放入一些buffer中。
目前我已经创建了这个缓冲区结构:
template <typename T>
class buffer {
public:
atomic_buffer(int n);
int bufSize() const noexcept;
bool bufEmpty() const noexcept;
bool full() const noexcept;
~atomic_buffer() = default;
void put(const T & x, bool last) noexcept;
std::pair<bool,T> get() noexcept;
private:
int next_pos(int p) const noexcept;
private:
struct item {
bool last;
T value;
};
const int size_;
std::unique_ptr<item[]> buf_;
alignas(64) std::atomic<int> nextRd_ {0};
alignas(64) std::atomic<int> nextWrt_ {0};
};
我还创建了一个 vector
结构,用于存储非缓冲区集合,以满足不确定数量的线程需求。
std::vector<std::unique_ptr<locked_buffer<std::pair<int, std::vector<std::vector<unsigned char>>>>>> v1;
for(int i=0; i<n; i++){
v1.push_back(std::unique_ptr<locked_buffer<std::pair<int,std::vector<std::vector<unsigned char>>>>> (new locked_buffer<std::pair<int, std::vector<std::vector<unsigned char>>>>(aux)));
}
编辑:
在不知道更多上下文的情况下,这看起来像是一个标准线程池的应用程序。您有不同的任务排队到同步队列(例如 buffer
class 那里)。线程池的每个工作线程轮询这个队列,每次处理一个任务(例如执行run()
方法)。他们将结果写回另一个同步队列。
每个工作线程都有一对自己的线程局部输入和输出缓冲区。它们不需要同步,因为它们只能从所有者线程本身访问。
编辑: 其实,我认为这可以简化很多:只需使用一个线程池和一个同步队列。工作线程可以将新任务直接排入队列。绘图中的每个线程都对应一种类型的任务,并实现一个通用的 Task
接口。
您不需要多个缓冲区。您可以使用多态性并将所有内容放在一个缓冲区中。
编辑 2 - 线程池的解释:
线程池只是一个概念。忘记池方面,使用固定数量的线程。主要思想是:不再有多个具有特定功能的线程,而是有 N 个线程可以处理任何类型的任务。其中 N 是 CPU.
的核心数
你可以改造这个
进入
工作线程执行如下操作。请注意,这是简化的,但您应该明白了。
void Thread::run(buffer<Task*>& queue) {
while(true) {
Task* task = queue.get();
if(task)
task->execute();
while(queue.isEmpty())
waitUntilQueueHasElement();
}
}
并且您的任务实现了一个通用接口,因此您可以将 Task*
个指针放入单个队列中:
struct Task {
virtual void execute() = 0;
}
struct Task1 : public Task {
virtual void execute() override {
A();
B1();
C();
}
}
...
另外,帮自己一个忙,使用 typedef ;)
`std::vector<std::unique_ptr<locked_buffer<std::pair<int, std::vector<std::vector<unsigned char>>>>>> v1;`
变成
typedef std::vector<std::vector<unsigned char>> vector2D_uchar;
typedef std::pair<int, vector2D_uchar> int_vec_pair;
typedef std::unique_ptr<locked_buffer<int_vec_pair>> locked_buffer_ptr;
std::vector<locked_buffer_ptr> v1;
我必须用 C++ 编写一个多生产者-消费者系统,但我在尝试将模型的每个部分(具有正确缓冲区的线程)放在一起时迷路了。该模型的基本功能是:我有一个执行函数的初始线程。这个 returned 结果需要放入不确定数量的缓冲区中,因为函数处理的每个元素都是不同的,需要在单个线程中处理。然后,随着buffer中存储的数据,另外n
个线程需要获取这个buffer中的数据来做另外一个功能,这个的return需要再次放入一些buffer中。
目前我已经创建了这个缓冲区结构:
template <typename T>
class buffer {
public:
atomic_buffer(int n);
int bufSize() const noexcept;
bool bufEmpty() const noexcept;
bool full() const noexcept;
~atomic_buffer() = default;
void put(const T & x, bool last) noexcept;
std::pair<bool,T> get() noexcept;
private:
int next_pos(int p) const noexcept;
private:
struct item {
bool last;
T value;
};
const int size_;
std::unique_ptr<item[]> buf_;
alignas(64) std::atomic<int> nextRd_ {0};
alignas(64) std::atomic<int> nextWrt_ {0};
};
我还创建了一个 vector
结构,用于存储非缓冲区集合,以满足不确定数量的线程需求。
std::vector<std::unique_ptr<locked_buffer<std::pair<int, std::vector<std::vector<unsigned char>>>>>> v1;
for(int i=0; i<n; i++){
v1.push_back(std::unique_ptr<locked_buffer<std::pair<int,std::vector<std::vector<unsigned char>>>>> (new locked_buffer<std::pair<int, std::vector<std::vector<unsigned char>>>>(aux)));
}
编辑:
在不知道更多上下文的情况下,这看起来像是一个标准线程池的应用程序。您有不同的任务排队到同步队列(例如 buffer
class 那里)。线程池的每个工作线程轮询这个队列,每次处理一个任务(例如执行run()
方法)。他们将结果写回另一个同步队列。
每个工作线程都有一对自己的线程局部输入和输出缓冲区。它们不需要同步,因为它们只能从所有者线程本身访问。
编辑: 其实,我认为这可以简化很多:只需使用一个线程池和一个同步队列。工作线程可以将新任务直接排入队列。绘图中的每个线程都对应一种类型的任务,并实现一个通用的 Task
接口。
您不需要多个缓冲区。您可以使用多态性并将所有内容放在一个缓冲区中。
编辑 2 - 线程池的解释:
线程池只是一个概念。忘记池方面,使用固定数量的线程。主要思想是:不再有多个具有特定功能的线程,而是有 N 个线程可以处理任何类型的任务。其中 N 是 CPU.
你可以改造这个
进入
工作线程执行如下操作。请注意,这是简化的,但您应该明白了。
void Thread::run(buffer<Task*>& queue) {
while(true) {
Task* task = queue.get();
if(task)
task->execute();
while(queue.isEmpty())
waitUntilQueueHasElement();
}
}
并且您的任务实现了一个通用接口,因此您可以将 Task*
个指针放入单个队列中:
struct Task {
virtual void execute() = 0;
}
struct Task1 : public Task {
virtual void execute() override {
A();
B1();
C();
}
}
...
另外,帮自己一个忙,使用 typedef ;)
`std::vector<std::unique_ptr<locked_buffer<std::pair<int, std::vector<std::vector<unsigned char>>>>>> v1;`
变成
typedef std::vector<std::vector<unsigned char>> vector2D_uchar;
typedef std::pair<int, vector2D_uchar> int_vec_pair;
typedef std::unique_ptr<locked_buffer<int_vec_pair>> locked_buffer_ptr;
std::vector<locked_buffer_ptr> v1;