在流水线执行中使用并行性
Employing parallelism in pipelined execution
我正在尝试开发一个管道,其中数据首先被读取和处理,操作一次,以不同的方式操作,然后显示。我有一个设计,其中数据 IO 馈送到第一个操纵器读取的缓冲区。随后,第一个操纵器写入另一个缓冲区,该缓冲区在可能时由第二个操纵器读取。最后,第二个操纵器的输出被写入显示缓冲区,由可视化工具读取并使用 OpenGL 显示。
在我看来,这是一个相当简单的并行问题,其中每个任务都有自己的线程,并且它们通过数据缓冲区进行通信。但是,我遇到的所有关于线程程序的教程似乎都表明,多线程应该留给某些决定如何分配工作负载的中间件(如 OpenMP)。
我是开发多线程应用程序的新手,所以这可能是一个愚蠢的问题,但我所描述的是否可行,是否可以使用像 OpenMP 这样的中间件来完成?我意识到明显的答案是 "try it," 并且我想这样做,但是教程没有说明*如何*尝试它。
OpenMP 更适合轻松跨越多核 (SIMD) 的算法。其他情况也是可能的,但在您的情况下,我认为直接使用线程会更好,并且更容易编码和维护。
我将我的回答分成两部分:一个通用解决方案 没有 OpenMP,以及一些使用 OpenMP 的特定更改。
如评论中所述,您正面临 producer/consumer 问题,但有两次:一个线程正在填充缓冲区(生成一个项目),然后必须由第二个线程读取(和修改)一个(消耗)。您的问题的特殊性在于,第二个线程也是生产者(要绘制的图像),第三个线程是负责使用它的线程(可视化器)。
如您所知,P/C 问题是使用缓冲区(可能是循环缓冲区或生产项目队列)解决的,其中缓冲区的每个元素都被标记为生产或消费,并且在哪里线程在添加或从中获取项目时具有独占访问权限。
让我们在下面的示例程序中使用队列方法来解决您的问题。
- 生成的项目将存储在队列中。队列的前端包含最旧的元素,即必须首先使用的元素。
- 有两个队列:一个用于第一个操纵器产生的数据(并由第二个操纵器使用),另一个用于第二个操纵器产生的数据(并且将由另一个线程可视化) ).
- 生产阶段很简单:获得对相应队列的独占访问权并在末尾插入元素。
- 消费类似,但必须等待队列至少有一个元素(不为空)。
- 我添加了一些 sleeps 来模拟其他操作。
- 停止条件仅供说明之用。
注意:为了简单起见,我假设您可以使用 C++11 编译器。使用其他API实现也比较类似。
#include <iostream>
#include <thread>
#include <mutex>
#include <atomic>
#include <chrono>
#include <list>
using namespace std::chrono_literals;
std::mutex g_data_produced_by_m1_mutex;
std::list<int> g_data_produced_by_m1;
std::mutex g_data_produced_by_m2_mutex;
std::list<int> g_data_produced_by_m2;
std::atomic<bool> stop = false;
void manipulator1_kernel()
{
while (!stop) {
// Producer 1: generate data
{
std::lock_guard<std::mutex> lock(g_data_produced_by_m1_mutex);
g_data_produced_by_m1.push_back(rand());
}
std::this_thread::sleep_for(100ms);
}
}
void manipulator2_kernel()
{
int data;
while (!stop) {
// Consumer 1
while (!stop) { // wait until there is an item to be consumed
{
std::lock_guard<std::mutex> lock(g_data_produced_by_m1_mutex);
if (!g_data_produced_by_m1.empty()) { // is there data to be consumed?
data = g_data_produced_by_m1.front(); // consume
g_data_produced_by_m1.pop_front();
break;
}
}
std::this_thread::sleep_for(100ms);
}
// Producer 2: modify and send to the visualizer
{
std::lock_guard<std::mutex> lock(g_data_produced_by_m2_mutex);
g_data_produced_by_m2.push_back(5 * data);
}
std::this_thread::sleep_for(100ms);
}
}
void visualizer_kernel()
{
int data;
while (!stop) {
// Consumer 2
while (!stop) { // wait until there is an item to be visualized
{
std::lock_guard<std::mutex> lock(g_data_produced_by_m2_mutex);
if (!g_data_produced_by_m2.empty()) {
data = g_data_produced_by_m2.front();
g_data_produced_by_m2.pop_front();
break;
}
}
std::this_thread::sleep_for(100ms);
}
std::cout << data << std::endl; // render to display
std::this_thread::sleep_for(100ms);
if (data % 8 == 0) stop = true; // some stop condition for the example
}
}
int main()
{
std::thread manipulator1(manipulator1_kernel);
std::thread manipulator2(manipulator2_kernel);
std::thread visualizer(visualizer_kernel);
visualizer.join();
manipulator2.join();
manipulator1.join();
return 0;
}
如果您仍想使用 OpenMP,您能找到的最接近的可能是 tasks(我认为是从 OpenMP 3.0 开始)。我用得不多,但上面的程序可以重写为:
int main()
{
#pragma omp parallel
{
#pragma omp task
manipulator1_kernel();
#pragma omp task
manipulator2_kernel();
#pragma omp task
visualizer_kernel();
#pragma omp taskwait
}
return 0;
}
其余代码也可以更改为使用 OpenMP 功能,但我认为这回答了您的问题。
这种方法的主要问题是您必须为任务创建一个代码块以在 OpenMP parallel
中运行,这很容易使您的应用程序逻辑和结构的其余部分复杂化。
为了解决这个特殊问题,英特尔® 线程构建模块库包含特殊结构。 Intel® TBB 是有助于多线程编程的跨平台库。
我们可以将应用程序中涉及的实体视为四个不同的任务提供者。一种类型的任务是输入任务 - 那些提供输入数据的任务,另一种类型的任务由第一个操作例程提供,依此类推。
因此,用户唯一需要做的就是为这些任务提供正文。库中有几个 API 用于指定要处理的主体以及如何并行处理。其他一切(这里我指的是线程创建、任务执行之间的同步、工作平衡等)都由库完成。
我想到的最简单的解决方案变体是使用 parallel_pipeline 函数。这是原型:
#include "tbb/pipeline.h"
using namespace tbb;
int main() {
parallel_pipeline(/*specify max number of bodies executed in parallel, e.g.*/16,
make_filter<void, input_data_type>(
filter::serial_in_order, // read data sequentially
[](flow_control& fc) -> input_data_type {
if ( /*check some stop condition: EOF, etc.*/ ) {
fc.stop();
return input_data_type(); // return dummy value
}
auto input_data = read_data();
return input_data;
}
) &
make_filter<input_data_type, manipulator1_output_type>(
filter::parallel, // process data in parallel by the first manipulator
[](input_data_type elem) -> manipulator1_output_type {
auto processed_elem = manipulator1::process(elem);
return processed_elem;
}
) &
make_filter<manipulator1_output_type, manipulator2_output_type>(
filter::parallel, // process data in parallel by the second manipulator
[](manipulator1_output_type elem) -> manipulator2_output_type {
auto processed_elem = manipulator2::process(elem);
return processed_elem;
}
) &
make_filter<manipulator2_output_type, void>(
filter::serial_in_order, // visualize frame by frame
[](manipulator2_output_type elem) {
visualize(elem);
}
)
);
return 0;
}
前提是实现了必要的功能(read_data,可视化)。这里input_data_type
、manipulator1_output_type
等是管道阶段之间传递的类型,操纵器的process
函数对传递的参数进行必要的计算。
顺便说一句,为了避免使用锁和其他同步原语,您可以使用库中的 concurrent_bounded_queue 并将您的输入数据放入此队列中,通过可能不同的线程(例如,专用于 IO 操作),如简单如concurrent_bounded_queue_instance.push(elem)
,然后通过input_data_type elem; concurrent_bounded_queue_instance.pop(elem)
阅读。请注意,弹出一个项目在这里是一个阻塞操作。 concurrent_queue
提供非阻塞 try_pop
替代方案。
另一种可能性是使用 tbb::flow_graph
and its nodes for organizing the same pipelining scheme. Take a look at two examples that describe dependency and data flow graphs. You might need to use sequencer_node 来正确排序项目执行(如有必要)。
值得阅读 tbb 标记的 SO 问题,以了解其他人如何使用此库。
你实现了单线程版本吗?异形?
它们是关键的步骤,w/o它们你可以获得高度并行设计的最佳实现,只是要意识到瓶颈是 I/O 你的缓冲区 and/or 线程同步and/or 错误共享 and/or 缓存未命中或类似问题。
我会首先尝试一个简单的线程池,其中包含按顺序执行所有步骤的任务。然后在分析它是如何工作的之后,什么是 CPU 消耗等。我会尝试使用更复杂的工具 总是将它们的性能与第一个简单版本
进行比较
我正在尝试开发一个管道,其中数据首先被读取和处理,操作一次,以不同的方式操作,然后显示。我有一个设计,其中数据 IO 馈送到第一个操纵器读取的缓冲区。随后,第一个操纵器写入另一个缓冲区,该缓冲区在可能时由第二个操纵器读取。最后,第二个操纵器的输出被写入显示缓冲区,由可视化工具读取并使用 OpenGL 显示。
在我看来,这是一个相当简单的并行问题,其中每个任务都有自己的线程,并且它们通过数据缓冲区进行通信。但是,我遇到的所有关于线程程序的教程似乎都表明,多线程应该留给某些决定如何分配工作负载的中间件(如 OpenMP)。
我是开发多线程应用程序的新手,所以这可能是一个愚蠢的问题,但我所描述的是否可行,是否可以使用像 OpenMP 这样的中间件来完成?我意识到明显的答案是 "try it," 并且我想这样做,但是教程没有说明*如何*尝试它。
OpenMP 更适合轻松跨越多核 (SIMD) 的算法。其他情况也是可能的,但在您的情况下,我认为直接使用线程会更好,并且更容易编码和维护。
我将我的回答分成两部分:一个通用解决方案 没有 OpenMP,以及一些使用 OpenMP 的特定更改。
如评论中所述,您正面临 producer/consumer 问题,但有两次:一个线程正在填充缓冲区(生成一个项目),然后必须由第二个线程读取(和修改)一个(消耗)。您的问题的特殊性在于,第二个线程也是生产者(要绘制的图像),第三个线程是负责使用它的线程(可视化器)。
如您所知,P/C 问题是使用缓冲区(可能是循环缓冲区或生产项目队列)解决的,其中缓冲区的每个元素都被标记为生产或消费,并且在哪里线程在添加或从中获取项目时具有独占访问权限。
让我们在下面的示例程序中使用队列方法来解决您的问题。
- 生成的项目将存储在队列中。队列的前端包含最旧的元素,即必须首先使用的元素。
- 有两个队列:一个用于第一个操纵器产生的数据(并由第二个操纵器使用),另一个用于第二个操纵器产生的数据(并且将由另一个线程可视化) ).
- 生产阶段很简单:获得对相应队列的独占访问权并在末尾插入元素。
- 消费类似,但必须等待队列至少有一个元素(不为空)。
- 我添加了一些 sleeps 来模拟其他操作。
- 停止条件仅供说明之用。
注意:为了简单起见,我假设您可以使用 C++11 编译器。使用其他API实现也比较类似。
#include <iostream>
#include <thread>
#include <mutex>
#include <atomic>
#include <chrono>
#include <list>
using namespace std::chrono_literals;
std::mutex g_data_produced_by_m1_mutex;
std::list<int> g_data_produced_by_m1;
std::mutex g_data_produced_by_m2_mutex;
std::list<int> g_data_produced_by_m2;
std::atomic<bool> stop = false;
void manipulator1_kernel()
{
while (!stop) {
// Producer 1: generate data
{
std::lock_guard<std::mutex> lock(g_data_produced_by_m1_mutex);
g_data_produced_by_m1.push_back(rand());
}
std::this_thread::sleep_for(100ms);
}
}
void manipulator2_kernel()
{
int data;
while (!stop) {
// Consumer 1
while (!stop) { // wait until there is an item to be consumed
{
std::lock_guard<std::mutex> lock(g_data_produced_by_m1_mutex);
if (!g_data_produced_by_m1.empty()) { // is there data to be consumed?
data = g_data_produced_by_m1.front(); // consume
g_data_produced_by_m1.pop_front();
break;
}
}
std::this_thread::sleep_for(100ms);
}
// Producer 2: modify and send to the visualizer
{
std::lock_guard<std::mutex> lock(g_data_produced_by_m2_mutex);
g_data_produced_by_m2.push_back(5 * data);
}
std::this_thread::sleep_for(100ms);
}
}
void visualizer_kernel()
{
int data;
while (!stop) {
// Consumer 2
while (!stop) { // wait until there is an item to be visualized
{
std::lock_guard<std::mutex> lock(g_data_produced_by_m2_mutex);
if (!g_data_produced_by_m2.empty()) {
data = g_data_produced_by_m2.front();
g_data_produced_by_m2.pop_front();
break;
}
}
std::this_thread::sleep_for(100ms);
}
std::cout << data << std::endl; // render to display
std::this_thread::sleep_for(100ms);
if (data % 8 == 0) stop = true; // some stop condition for the example
}
}
int main()
{
std::thread manipulator1(manipulator1_kernel);
std::thread manipulator2(manipulator2_kernel);
std::thread visualizer(visualizer_kernel);
visualizer.join();
manipulator2.join();
manipulator1.join();
return 0;
}
如果您仍想使用 OpenMP,您能找到的最接近的可能是 tasks(我认为是从 OpenMP 3.0 开始)。我用得不多,但上面的程序可以重写为:
int main()
{
#pragma omp parallel
{
#pragma omp task
manipulator1_kernel();
#pragma omp task
manipulator2_kernel();
#pragma omp task
visualizer_kernel();
#pragma omp taskwait
}
return 0;
}
其余代码也可以更改为使用 OpenMP 功能,但我认为这回答了您的问题。
这种方法的主要问题是您必须为任务创建一个代码块以在 OpenMP parallel
中运行,这很容易使您的应用程序逻辑和结构的其余部分复杂化。
为了解决这个特殊问题,英特尔® 线程构建模块库包含特殊结构。 Intel® TBB 是有助于多线程编程的跨平台库。 我们可以将应用程序中涉及的实体视为四个不同的任务提供者。一种类型的任务是输入任务 - 那些提供输入数据的任务,另一种类型的任务由第一个操作例程提供,依此类推。
因此,用户唯一需要做的就是为这些任务提供正文。库中有几个 API 用于指定要处理的主体以及如何并行处理。其他一切(这里我指的是线程创建、任务执行之间的同步、工作平衡等)都由库完成。
我想到的最简单的解决方案变体是使用 parallel_pipeline 函数。这是原型:
#include "tbb/pipeline.h"
using namespace tbb;
int main() {
parallel_pipeline(/*specify max number of bodies executed in parallel, e.g.*/16,
make_filter<void, input_data_type>(
filter::serial_in_order, // read data sequentially
[](flow_control& fc) -> input_data_type {
if ( /*check some stop condition: EOF, etc.*/ ) {
fc.stop();
return input_data_type(); // return dummy value
}
auto input_data = read_data();
return input_data;
}
) &
make_filter<input_data_type, manipulator1_output_type>(
filter::parallel, // process data in parallel by the first manipulator
[](input_data_type elem) -> manipulator1_output_type {
auto processed_elem = manipulator1::process(elem);
return processed_elem;
}
) &
make_filter<manipulator1_output_type, manipulator2_output_type>(
filter::parallel, // process data in parallel by the second manipulator
[](manipulator1_output_type elem) -> manipulator2_output_type {
auto processed_elem = manipulator2::process(elem);
return processed_elem;
}
) &
make_filter<manipulator2_output_type, void>(
filter::serial_in_order, // visualize frame by frame
[](manipulator2_output_type elem) {
visualize(elem);
}
)
);
return 0;
}
前提是实现了必要的功能(read_data,可视化)。这里input_data_type
、manipulator1_output_type
等是管道阶段之间传递的类型,操纵器的process
函数对传递的参数进行必要的计算。
顺便说一句,为了避免使用锁和其他同步原语,您可以使用库中的 concurrent_bounded_queue 并将您的输入数据放入此队列中,通过可能不同的线程(例如,专用于 IO 操作),如简单如concurrent_bounded_queue_instance.push(elem)
,然后通过input_data_type elem; concurrent_bounded_queue_instance.pop(elem)
阅读。请注意,弹出一个项目在这里是一个阻塞操作。 concurrent_queue
提供非阻塞 try_pop
替代方案。
另一种可能性是使用 tbb::flow_graph
and its nodes for organizing the same pipelining scheme. Take a look at two examples that describe dependency and data flow graphs. You might need to use sequencer_node 来正确排序项目执行(如有必要)。
值得阅读 tbb 标记的 SO 问题,以了解其他人如何使用此库。
你实现了单线程版本吗?异形?
它们是关键的步骤,w/o它们你可以获得高度并行设计的最佳实现,只是要意识到瓶颈是 I/O 你的缓冲区 and/or 线程同步and/or 错误共享 and/or 缓存未命中或类似问题。
我会首先尝试一个简单的线程池,其中包含按顺序执行所有步骤的任务。然后在分析它是如何工作的之后,什么是 CPU 消耗等。我会尝试使用更复杂的工具 总是将它们的性能与第一个简单版本
进行比较