如何将矩阵提升为具有多个线程的幂?
How can I raise a matrix to a power with multiple threads?
我正在尝试将矩阵提升为多线程的幂,但我对线程不是很擅长。我还从键盘输入线程数,该数字在 [1,矩阵高度] 范围内,然后执行以下操作:
unsigned period = ceil((double)A.getHeight() / threadNum);
unsigned prev = 0, next = period;
for (unsigned i(0); i < threadNum; ++i) {
threads.emplace_back(&power<long long>, std::ref(result), std::ref(A), std::ref(B), prev, next, p);
if (next + period > A.getHeight()) {
prev = next;
next = A.getHeight();
}
else {
prev = next;
next += period;
}
}
我很容易用多个线程将一个矩阵乘以另一个矩阵,但这里的问题是一旦完成了 1 步,例如我需要将 A 乘以 3 次方,A^2 将是那一步,在那一步之后,我必须等待所有线程完成,然后才能继续执行 A^2*A。我怎样才能让我的线程等待呢?我正在使用 std::thread 的。
在第一个回复发布后,我意识到我忘了提到我只想创建这些线程一次,而不是为每个乘法步骤重新创建它们。
我先简单分解一下:
- 矩阵乘法实现多线程化
- 矩阵指数多次调用乘法。
类似的东西:
Mat multithreaded_multiply(Mat const& left, Mat const& right) {...}
Mat power(Mat const& M, int n)
{
// Handle degenerate cases here (n = 0, 1)
// Regular loop
Mat intermediate = M;
for (int i = 2; i <= n; ++i)
{
intermediate = multithreaded_multiply(M, intermediate);
}
}
为了等待 std::thread
,您有 method join()
。
我建议使用 condition_variable。
算法应该是这样的:
将矩阵拆分为 N 个线程的 N 个部分。
每个线程计算一次乘法所需的结果子矩阵。
然后它使用 fetch_add
递增原子 threads_finished
计数器并等待共享条件变量。
最后一个线程完成(fetch_add()+1 == 线程计数),通知所有线程,它们现在可以继续处理。
- 利润。
编辑:
这是如何停止线程的示例:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <algorithm>
#include <atomic>
void sync_threads(std::condition_variable & cv, std::mutex & mut, std::vector<int> & threads, const int idx) {
std::unique_lock<std::mutex> lock(mut);
threads[idx] = 1;
if(std::find(threads.begin(),threads.end(),0) == threads.end()) {
for(auto & i: threads)
i = 0;
cv.notify_all();
} else {
while(threads[idx])
cv.wait(lock);
}
}
int main(){
std::vector<std::thread> threads;
std::mutex mut;
std::condition_variable cv;
int max_threads = 10;
std::vector<int> thread_wait(max_threads,0);
for(int i = 0; i < max_threads; i++) {
threads.emplace_back([&,i](){
std::cout << "Thread "+ std::to_string(i)+" started\n";
sync_threads(cv,mut,thread_wait,i);
std::cout << "Continuing thread " + std::to_string(i) + "\n";
sync_threads(cv,mut,thread_wait,i);
std::cout << "Continuing thread for second time " + std::to_string(i) + "\n";
});
}
for(auto & i: threads)
i.join();
}
有趣的部分在这里:
void sync_threads(std::condition_variable & cv, std::mutex & mut, std::vector<int> & threads, const int idx) {
std::unique_lock<std::mutex> lock(mut); // Lock because we want to modify cv
threads[idx] = 1; // Set my idx to 1, so we know we are sleeping
if(std::find(threads.begin(),threads.end(),0) == threads.end()) {
// I'm the last thread, wake up everyone
for(auto & i: threads)
i = 0;
cv.notify_all();
} else { //I'm not the last thread - sleep until all are finished
while(threads[idx]) // In loop so, if we wake up unexpectedly, we go back to sleep. (Thanks for pointing that out Yakk)
cv.wait(lock);
}
}
不是编程而是数学答案:对于每个方阵都有一组所谓的 "eigenvalues" 和 "eigenvectors",因此 M * E_i = lambda_i * E_i。 M是矩阵,E_i是特征向量,lambda_i是特征值,只是一个复数。所以 M^n * E_i = lambda_i^n *E_i。所以你只需要一个复数的 n 次方而不是矩阵。特征向量是正交的,即任何向量 V = sum_i a_i * E_i。所以 M^n * V = sum_i a_i lambda^n E_i。
根据您的问题,这可能会显着加快速度。
这是一个mass_thread_pool
:
// launches n threads all doing task F with an index:
template<class F>
struct mass_thread_pool {
F f;
std::vector< std::thread > threads;
std::condition_variable cv;
std::mutex m;
size_t task_id = 0;
size_t finished_count = 0;
std::unique_ptr<std::promise<void>> task_done;
std::atomic<bool> finished;
void task( F f, size_t n, size_t cur_task ) {
//std::cout << "Thread " << n << " launched" << std::endl;
do {
f(n);
std::unique_lock<std::mutex> lock(m);
if (finished)
break;
++finished_count;
if (finished_count == threads.size())
{
//std::cout << "task set finished" << std::endl;
task_done->set_value();
finished_count = 0;
}
cv.wait(lock,[&]{if (finished) return true; if (cur_task == task_id) return false; cur_task=task_id; return true;});
} while(!finished);
//std::cout << finished << std::endl;
//std::cout << "Thread " << n << " finished" << std::endl;
}
mass_thread_pool() = delete;
mass_thread_pool(F fin):f(fin),finished(false) {}
mass_thread_pool(mass_thread_pool&&)=delete; // address is party of identity
std::future<void> kick( size_t n ) {
//std::cout << "kicking " << n << " threads off. Prior count is " << threads.size() << std::endl;
std::future<void> r;
{
std::unique_lock<std::mutex> lock(m);
++task_id;
task_done.reset( new std::promise<void>() );
finished_count = 0;
r = task_done->get_future();
while (threads.size() < n) {
size_t i = threads.size();
threads.emplace_back( &mass_thread_pool::task, this, f, i, task_id );
}
//std::cout << "count is now " << threads.size() << std::endl;
}
cv.notify_all();
return r;
}
~mass_thread_pool() {
//std::cout << "destroying thread pool" << std::endl;
finished = true;
cv.notify_all();
for (auto&& t:threads) {
//std::cout << "joining thread" << std::endl;
t.join();
}
//std::cout << "destroyed thread pool" << std::endl;
}
};
你用一个任务构建它,然后你 kick(77)
启动该任务的 77 个副本(每个副本都有不同的索引)。
kick
returns一个std::future<void>
。您必须等待这个未来完成所有任务。
然后你可以销毁线程池,或者再次调用kick(77)
重新启动任务。
想法是您传递给 mass_thread_pool
的函数对象可以访问您的输入和输出数据(例如,您要相乘的矩阵,或指向它们的指针)。每个 kick
都会导致它为每个索引调用一次您的函数。您负责将索引转换为任何偏移量。
Live example 我用它来为另一个 vector
中的条目加 1。在迭代之间,我们交换向量。这会执行 2000 次迭代,并启动 10 个线程,并调用 lambda 20000 次。
注意 auto&& pool = make_pool( lambda )
位。需要使用 auto&&
——因为线程池有指向自身的指针,所以我在大量线程池上禁用了移动和复制构造。如果您确实需要传递它,请创建一个指向线程池的唯一指针。
我 运行 遇到了 std::promise
重置的一些问题,所以我将其包装在 unique_ptr 中。这可能不是必需的。
我用来调试它的跟踪语句被注释掉了。
用不同的 n
调用 kick
可能有效也可能无效。肯定用较小的 n
调用它不会像你期望的那样工作(在这种情况下它会忽略 n
)。
在您调用 kick
之前不会进行任何处理。 kick
是 "kick off".
的缩写
...
对于您的问题,我要做的是制作一个拥有 mass_thread_pool
.
的乘数对象
乘法器有一个指向 3 个矩阵的指针(a
、b
和 out
)。 n 个子任务中的每一个都会生成 out
.
的一些子部分
您将 2 个矩阵传递给乘法器,它将指向 out
的指针设置为本地矩阵,并将 a
和 b
设置为传入的矩阵,执行 kick
,然后等待,然后 returns 本地矩阵。
对于幂,您使用上面的乘数构建一个二次方塔,同时根据指数的位乘法累加到您的结果中(再次使用上面的乘数)。
上面的更高级版本可以允许乘法和 std::future<Matrix>
s(以及未来矩阵的乘法)排队。
我正在尝试将矩阵提升为多线程的幂,但我对线程不是很擅长。我还从键盘输入线程数,该数字在 [1,矩阵高度] 范围内,然后执行以下操作:
unsigned period = ceil((double)A.getHeight() / threadNum);
unsigned prev = 0, next = period;
for (unsigned i(0); i < threadNum; ++i) {
threads.emplace_back(&power<long long>, std::ref(result), std::ref(A), std::ref(B), prev, next, p);
if (next + period > A.getHeight()) {
prev = next;
next = A.getHeight();
}
else {
prev = next;
next += period;
}
}
我很容易用多个线程将一个矩阵乘以另一个矩阵,但这里的问题是一旦完成了 1 步,例如我需要将 A 乘以 3 次方,A^2 将是那一步,在那一步之后,我必须等待所有线程完成,然后才能继续执行 A^2*A。我怎样才能让我的线程等待呢?我正在使用 std::thread 的。
在第一个回复发布后,我意识到我忘了提到我只想创建这些线程一次,而不是为每个乘法步骤重新创建它们。
我先简单分解一下:
- 矩阵乘法实现多线程化
- 矩阵指数多次调用乘法。
类似的东西:
Mat multithreaded_multiply(Mat const& left, Mat const& right) {...}
Mat power(Mat const& M, int n)
{
// Handle degenerate cases here (n = 0, 1)
// Regular loop
Mat intermediate = M;
for (int i = 2; i <= n; ++i)
{
intermediate = multithreaded_multiply(M, intermediate);
}
}
为了等待 std::thread
,您有 method join()
。
我建议使用 condition_variable。
算法应该是这样的:
将矩阵拆分为 N 个线程的 N 个部分。
每个线程计算一次乘法所需的结果子矩阵。
然后它使用
fetch_add
递增原子threads_finished
计数器并等待共享条件变量。最后一个线程完成(fetch_add()+1 == 线程计数),通知所有线程,它们现在可以继续处理。
- 利润。
编辑: 这是如何停止线程的示例:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <algorithm>
#include <atomic>
void sync_threads(std::condition_variable & cv, std::mutex & mut, std::vector<int> & threads, const int idx) {
std::unique_lock<std::mutex> lock(mut);
threads[idx] = 1;
if(std::find(threads.begin(),threads.end(),0) == threads.end()) {
for(auto & i: threads)
i = 0;
cv.notify_all();
} else {
while(threads[idx])
cv.wait(lock);
}
}
int main(){
std::vector<std::thread> threads;
std::mutex mut;
std::condition_variable cv;
int max_threads = 10;
std::vector<int> thread_wait(max_threads,0);
for(int i = 0; i < max_threads; i++) {
threads.emplace_back([&,i](){
std::cout << "Thread "+ std::to_string(i)+" started\n";
sync_threads(cv,mut,thread_wait,i);
std::cout << "Continuing thread " + std::to_string(i) + "\n";
sync_threads(cv,mut,thread_wait,i);
std::cout << "Continuing thread for second time " + std::to_string(i) + "\n";
});
}
for(auto & i: threads)
i.join();
}
有趣的部分在这里:
void sync_threads(std::condition_variable & cv, std::mutex & mut, std::vector<int> & threads, const int idx) {
std::unique_lock<std::mutex> lock(mut); // Lock because we want to modify cv
threads[idx] = 1; // Set my idx to 1, so we know we are sleeping
if(std::find(threads.begin(),threads.end(),0) == threads.end()) {
// I'm the last thread, wake up everyone
for(auto & i: threads)
i = 0;
cv.notify_all();
} else { //I'm not the last thread - sleep until all are finished
while(threads[idx]) // In loop so, if we wake up unexpectedly, we go back to sleep. (Thanks for pointing that out Yakk)
cv.wait(lock);
}
}
不是编程而是数学答案:对于每个方阵都有一组所谓的 "eigenvalues" 和 "eigenvectors",因此 M * E_i = lambda_i * E_i。 M是矩阵,E_i是特征向量,lambda_i是特征值,只是一个复数。所以 M^n * E_i = lambda_i^n *E_i。所以你只需要一个复数的 n 次方而不是矩阵。特征向量是正交的,即任何向量 V = sum_i a_i * E_i。所以 M^n * V = sum_i a_i lambda^n E_i。 根据您的问题,这可能会显着加快速度。
这是一个mass_thread_pool
:
// launches n threads all doing task F with an index:
template<class F>
struct mass_thread_pool {
F f;
std::vector< std::thread > threads;
std::condition_variable cv;
std::mutex m;
size_t task_id = 0;
size_t finished_count = 0;
std::unique_ptr<std::promise<void>> task_done;
std::atomic<bool> finished;
void task( F f, size_t n, size_t cur_task ) {
//std::cout << "Thread " << n << " launched" << std::endl;
do {
f(n);
std::unique_lock<std::mutex> lock(m);
if (finished)
break;
++finished_count;
if (finished_count == threads.size())
{
//std::cout << "task set finished" << std::endl;
task_done->set_value();
finished_count = 0;
}
cv.wait(lock,[&]{if (finished) return true; if (cur_task == task_id) return false; cur_task=task_id; return true;});
} while(!finished);
//std::cout << finished << std::endl;
//std::cout << "Thread " << n << " finished" << std::endl;
}
mass_thread_pool() = delete;
mass_thread_pool(F fin):f(fin),finished(false) {}
mass_thread_pool(mass_thread_pool&&)=delete; // address is party of identity
std::future<void> kick( size_t n ) {
//std::cout << "kicking " << n << " threads off. Prior count is " << threads.size() << std::endl;
std::future<void> r;
{
std::unique_lock<std::mutex> lock(m);
++task_id;
task_done.reset( new std::promise<void>() );
finished_count = 0;
r = task_done->get_future();
while (threads.size() < n) {
size_t i = threads.size();
threads.emplace_back( &mass_thread_pool::task, this, f, i, task_id );
}
//std::cout << "count is now " << threads.size() << std::endl;
}
cv.notify_all();
return r;
}
~mass_thread_pool() {
//std::cout << "destroying thread pool" << std::endl;
finished = true;
cv.notify_all();
for (auto&& t:threads) {
//std::cout << "joining thread" << std::endl;
t.join();
}
//std::cout << "destroyed thread pool" << std::endl;
}
};
你用一个任务构建它,然后你 kick(77)
启动该任务的 77 个副本(每个副本都有不同的索引)。
kick
returns一个std::future<void>
。您必须等待这个未来完成所有任务。
然后你可以销毁线程池,或者再次调用kick(77)
重新启动任务。
想法是您传递给 mass_thread_pool
的函数对象可以访问您的输入和输出数据(例如,您要相乘的矩阵,或指向它们的指针)。每个 kick
都会导致它为每个索引调用一次您的函数。您负责将索引转换为任何偏移量。
Live example 我用它来为另一个 vector
中的条目加 1。在迭代之间,我们交换向量。这会执行 2000 次迭代,并启动 10 个线程,并调用 lambda 20000 次。
注意 auto&& pool = make_pool( lambda )
位。需要使用 auto&&
——因为线程池有指向自身的指针,所以我在大量线程池上禁用了移动和复制构造。如果您确实需要传递它,请创建一个指向线程池的唯一指针。
我 运行 遇到了 std::promise
重置的一些问题,所以我将其包装在 unique_ptr 中。这可能不是必需的。
我用来调试它的跟踪语句被注释掉了。
用不同的 n
调用 kick
可能有效也可能无效。肯定用较小的 n
调用它不会像你期望的那样工作(在这种情况下它会忽略 n
)。
在您调用 kick
之前不会进行任何处理。 kick
是 "kick off".
...
对于您的问题,我要做的是制作一个拥有 mass_thread_pool
.
乘法器有一个指向 3 个矩阵的指针(a
、b
和 out
)。 n 个子任务中的每一个都会生成 out
.
您将 2 个矩阵传递给乘法器,它将指向 out
的指针设置为本地矩阵,并将 a
和 b
设置为传入的矩阵,执行 kick
,然后等待,然后 returns 本地矩阵。
对于幂,您使用上面的乘数构建一个二次方塔,同时根据指数的位乘法累加到您的结果中(再次使用上面的乘数)。
上面的更高级版本可以允许乘法和 std::future<Matrix>
s(以及未来矩阵的乘法)排队。