如何让不同的线程一起填充一个数组?
How to let different threads fill an array together?
假设我有一些任务(Monte Carlo 模拟)我想 运行 并行。我想完成给定数量的任务,但任务花费的时间不同,因此不容易在线程上平均分配工作。另外:最后我需要单个向量(或数组)中的所有模拟结果。
所以我想出了以下方法:
int Max{1000000};
//SimResult is some struct with well-defined default value.
std::vector<SimResult> vec(/*length*/Max);//Initialize with default values of SimResult
int LastAdded{0};
void fill(int RandSeed)
{
Simulator sim{RandSeed};
while(LastAdded < Max)
{
// Do some work to bring foo to the desired state
//The duration of this work is subject to randomness
vec[LastAdded++]
= sim.GetResult();//Produces SimResult.
}
}
main()
{
//launch a bunch of std::async that start
auto fut1 = std::async(fill,1);
auto fut2 = std::async(fill,2);
//maybe some more tasks.
fut1.get();
fut2.get();
//do something with the results in vec.
}
我猜上面的代码会给出竞态条件。我正在寻找一种高效的方法来避免这种情况。要求:避免竞争条件(填满整个数组,不跳过);最终结果立即在数组中;性能。
阅读各种方法后,原子似乎是一个不错的选择,但我不确定哪种设置对我的情况最有效?甚至不确定 atomic 是否会削减它;也许需要一个保护 LastAdded 的互斥体?
由于您已经知道要处理多少个元素并且从不更改向量的大小,因此最简单的解决方案是让每个线程处理向量中它自己的部分。例如
更新
为了适应变化很大的计算时间,您应该保留当前代码,但通过对所有线程都相同的 std::lock_guard
. You will need a std::mutex
避免竞争条件,例如全局变量,或传递对每个线程的互斥量。
void fill(int RandSeed, std::mutex &nextItemMutex)
{
Simulator sim{RandSeed};
size_t workingIndex;
while(true)
{
{
// enter critical area
std::lock_guard<std::mutex> nextItemLock(nextItemMutex);
// Acquire next item
if(LastAdded < Max)
{
workingIndex = LastAdded;
LastAdded++;
}
else
{
break;
}
// lock is released when nextItemLock goes out of scope
}
// Do some work to bring foo to the desired state
// The duration of this work is subject to randomness
vec[workingIndex] = sim.GetResult();//Produces SimResult.
}
}
问题在于,同步非常昂贵。但与您 运行 的模拟相比,它可能 并不昂贵,所以它应该不会太差。
版本 2:
为了减少所需的同步量,您可以获取要处理的块,而不是单个项目:
void fill(int RandSeed, std::mutex &nextItemMutex, size_t blockSize)
{
Simulator sim{RandSeed};
size_t workingIndex;
while(true)
{
{
std::lock_guard<std::mutex> nextItemLock(nextItemMutex);
if(LastAdded < Max)
{
workingIndex = LastAdded;
LastAdded += blockSize;
}
else
{
break;
}
}
for(size_t i = workingIndex; i < workingIndex + blockSize && i < MAX; i++)
vec[i] = sim.GetResult();//Produces SimResult.
}
}
简单版
void fill(int RandSeed, size_t partitionStart, size_t partitionEnd)
{
Simulator sim{RandSeed};
for(size_t i = partitionStart; i < partitionEnd; i++)
{
// Do some work to bring foo to the desired state
// The duration of this work is subject to randomness
vec[i] = sim.GetResult();//Produces SimResult.
}
}
main()
{
//launch a bunch of std::async that start
auto fut1 = std::async(fill,1, 0, Max / 2);
auto fut2 = std::async(fill,2, Max / 2, Max);
// ...
}
我要说的一件事是,您需要非常小心地使用标准库随机数函数。如果你的 'Simulator' class 创建了一个生成器的实例,你不应该 运行 Monte Carlo 使用同一个对象并行模拟,因为你可能会得到重复的模式运行 之间的随机数,这会给你不准确的结果。
这方面的最佳做法是创建 N 个具有相同属性的模拟器对象,并为每个对象提供不同的随机种子。然后,您可以使用 OpenMP 将这些对象集中到多个线程中,OpenMP 是一种用于科学软件开发的通用并行编程模型。
std::vector<SimResult> generateResults(size_t N_runs, double seed)
{
std::vector<SimResult> results(N_runs);
#pragma omp parallel for
for(auto i = 0; i < N_runs; i++)
{
auto sim = Simulator(seed + i);
results[i] = sim.GetResult();
}
}
编辑:使用 OpenMP,您可以选择不同的调度模型,例如在线程之间动态拆分工作。你可以这样做:
#pragma omp parallel for schedule(dynamic, 16)
这将为每个线程提供 16 个项目的块以供一次处理。
假设我有一些任务(Monte Carlo 模拟)我想 运行 并行。我想完成给定数量的任务,但任务花费的时间不同,因此不容易在线程上平均分配工作。另外:最后我需要单个向量(或数组)中的所有模拟结果。
所以我想出了以下方法:
int Max{1000000};
//SimResult is some struct with well-defined default value.
std::vector<SimResult> vec(/*length*/Max);//Initialize with default values of SimResult
int LastAdded{0};
void fill(int RandSeed)
{
Simulator sim{RandSeed};
while(LastAdded < Max)
{
// Do some work to bring foo to the desired state
//The duration of this work is subject to randomness
vec[LastAdded++]
= sim.GetResult();//Produces SimResult.
}
}
main()
{
//launch a bunch of std::async that start
auto fut1 = std::async(fill,1);
auto fut2 = std::async(fill,2);
//maybe some more tasks.
fut1.get();
fut2.get();
//do something with the results in vec.
}
我猜上面的代码会给出竞态条件。我正在寻找一种高效的方法来避免这种情况。要求:避免竞争条件(填满整个数组,不跳过);最终结果立即在数组中;性能。
阅读各种方法后,原子似乎是一个不错的选择,但我不确定哪种设置对我的情况最有效?甚至不确定 atomic 是否会削减它;也许需要一个保护 LastAdded 的互斥体?
由于您已经知道要处理多少个元素并且从不更改向量的大小,因此最简单的解决方案是让每个线程处理向量中它自己的部分。例如
更新
为了适应变化很大的计算时间,您应该保留当前代码,但通过对所有线程都相同的 std::lock_guard
. You will need a std::mutex
避免竞争条件,例如全局变量,或传递对每个线程的互斥量。
void fill(int RandSeed, std::mutex &nextItemMutex)
{
Simulator sim{RandSeed};
size_t workingIndex;
while(true)
{
{
// enter critical area
std::lock_guard<std::mutex> nextItemLock(nextItemMutex);
// Acquire next item
if(LastAdded < Max)
{
workingIndex = LastAdded;
LastAdded++;
}
else
{
break;
}
// lock is released when nextItemLock goes out of scope
}
// Do some work to bring foo to the desired state
// The duration of this work is subject to randomness
vec[workingIndex] = sim.GetResult();//Produces SimResult.
}
}
问题在于,同步非常昂贵。但与您 运行 的模拟相比,它可能 并不昂贵,所以它应该不会太差。
版本 2:
为了减少所需的同步量,您可以获取要处理的块,而不是单个项目:
void fill(int RandSeed, std::mutex &nextItemMutex, size_t blockSize)
{
Simulator sim{RandSeed};
size_t workingIndex;
while(true)
{
{
std::lock_guard<std::mutex> nextItemLock(nextItemMutex);
if(LastAdded < Max)
{
workingIndex = LastAdded;
LastAdded += blockSize;
}
else
{
break;
}
}
for(size_t i = workingIndex; i < workingIndex + blockSize && i < MAX; i++)
vec[i] = sim.GetResult();//Produces SimResult.
}
}
简单版
void fill(int RandSeed, size_t partitionStart, size_t partitionEnd)
{
Simulator sim{RandSeed};
for(size_t i = partitionStart; i < partitionEnd; i++)
{
// Do some work to bring foo to the desired state
// The duration of this work is subject to randomness
vec[i] = sim.GetResult();//Produces SimResult.
}
}
main()
{
//launch a bunch of std::async that start
auto fut1 = std::async(fill,1, 0, Max / 2);
auto fut2 = std::async(fill,2, Max / 2, Max);
// ...
}
我要说的一件事是,您需要非常小心地使用标准库随机数函数。如果你的 'Simulator' class 创建了一个生成器的实例,你不应该 运行 Monte Carlo 使用同一个对象并行模拟,因为你可能会得到重复的模式运行 之间的随机数,这会给你不准确的结果。
这方面的最佳做法是创建 N 个具有相同属性的模拟器对象,并为每个对象提供不同的随机种子。然后,您可以使用 OpenMP 将这些对象集中到多个线程中,OpenMP 是一种用于科学软件开发的通用并行编程模型。
std::vector<SimResult> generateResults(size_t N_runs, double seed)
{
std::vector<SimResult> results(N_runs);
#pragma omp parallel for
for(auto i = 0; i < N_runs; i++)
{
auto sim = Simulator(seed + i);
results[i] = sim.GetResult();
}
}
编辑:使用 OpenMP,您可以选择不同的调度模型,例如在线程之间动态拆分工作。你可以这样做:
#pragma omp parallel for schedule(dynamic, 16)
这将为每个线程提供 16 个项目的块以供一次处理。