C++11 std::thread 队列消费者
C++11 std::thread queue consumer
我正在考虑实现一个 FIFO 线程队列,其中创建了线程 lambda,但一次只处理 X 个线程。
我假设线程会捕获一些局部变量,但没有别的。
我正在尝试做的一个最小示例是:
#include <stdio.h>
#include <mutex>
#include <thread>
#include <atomic>
#include <vector>
#include <algorithm>
#include <condition_variable>
int main()
{
std::vector<std::thread> threads;
std::condition_variable condition;
std::mutex mutex;
std::atomic<unsigned int> thread_count( 0 );
for ( int i = 0; i < 20; i++ )
{
std::unique_lock<std::mutex> lock ( mutex );
condition.wait( lock, [&]{ return thread_count < 8;} );
std::thread t = std::thread([&,i]()
{
printf ("I am Thread #: %d with current thread count: %u\n", i, unsigned( thread_count ) );
thread_count--;
});
threads.push_back( std::move( t ) );
thread_count++;
}
for ( auto & t : threads ) t.join();
return 0;
}
我想在那个 for 循环中创建线程 objects/lamdas,然后让它们 运行 最多 8 个线程。
当前输出为:
I am Thread #: 0 with current thread count: 3
I am Thread #: 3 with current thread count: 4
I am Thread #: 2 with current thread count: 4
I am Thread #: 5 with current thread count: 3
I am Thread #: 1 with current thread count: 3
I am Thread #: 4 with current thread count: 5
I am Thread #: 6 with current thread count: 1
I am Thread #: 7 with current thread count: 1
I am Thread #: 8 with current thread count: 1
I am Thread #: 9 with current thread count: 1
I am Thread #: 10 with current thread count: 1
I am Thread #: 11 with current thread count: 1
I am Thread #: 12 with current thread count: 1
I am Thread #: 13 with current thread count: 1
I am Thread #: 14 with current thread count: 1
I am Thread #: 15 with current thread count: 1
I am Thread #: 16 with current thread count: 1
I am Thread #: 17 with current thread count: 1
I am Thread #: 18 with current thread count: 1
I am Thread #: 19 with current thread count: 1
这显然永远不会达到 8 个线程的最大数量。
你可以自己看看here
您正在创建的线程似乎比主循环创建新线程终止得更快。所以你没有看到 8 个并行线程在运行,而是更少。
请注意,程序输出将是不确定的,因为线程调度有些不可预测。如果调用频率足够高,您的程序实际上可能不时有 8 个并行线程 运行...
如果您更改 threadfunc lambda 以使其不会立即终止,您将看到线程堆积起来。您可以通过在 threadfunc lambda 中添加对 sleep
的一些调用来实现。
请注意,您的程序可能会挂起,因为主循环将等待条件变量。但是没有人发出信号,所以它可能会永远挂起。
因此您可能希望将 threadfunc lambda 末尾的 thread_count--;
行更改为:
std::unique_lock<std::mutex> lock ( mutex );
thread_count--;
condition.notify_one();
这不仅会减少线程数,还会向条件变量发出信号,以便主循环可以唤醒并在线程终止时恢复创建额外的线程。
完整的工作示例,每个线程随机延迟休眠(假装一些实际工作):
#include <stdio.h>
#include <mutex>
#include <thread>
#include <atomic>
#include <vector>
#include <algorithm>
#include <condition_variable>
#include <chrono>
#include <random>
int main()
{
std::vector<std::thread> threads;
std::condition_variable condition;
std::mutex mutex;
std::atomic<unsigned int> thread_count(0);
std::mt19937 generator;
for (int i = 0; i < 20; i++) {
std::unique_lock<std::mutex> lock (mutex);
condition.wait(lock, [&]{ return thread_count < 8; });
// random sleep delay
auto delay = generator() % 10000;
std::thread t = std::thread([&, i, delay]() {
printf("I am Thread #: %d with current thread count: %u\n", i, unsigned(thread_count));
// ok to access the generator here because
std::this_thread::sleep_for(std::chrono::milliseconds(delay));
printf("I am Thread #: %d and I am done\n", i);
// signal that this thread is done
std::unique_lock<std::mutex> lock(mutex);
thread_count--;
condition.notify_one();
});
threads.push_back(std::move(t));
thread_count++;
}
for (auto& t : threads) {
t.join();
}
return 0;
}
我正在考虑实现一个 FIFO 线程队列,其中创建了线程 lambda,但一次只处理 X 个线程。
我假设线程会捕获一些局部变量,但没有别的。
我正在尝试做的一个最小示例是:
#include <stdio.h>
#include <mutex>
#include <thread>
#include <atomic>
#include <vector>
#include <algorithm>
#include <condition_variable>
int main()
{
std::vector<std::thread> threads;
std::condition_variable condition;
std::mutex mutex;
std::atomic<unsigned int> thread_count( 0 );
for ( int i = 0; i < 20; i++ )
{
std::unique_lock<std::mutex> lock ( mutex );
condition.wait( lock, [&]{ return thread_count < 8;} );
std::thread t = std::thread([&,i]()
{
printf ("I am Thread #: %d with current thread count: %u\n", i, unsigned( thread_count ) );
thread_count--;
});
threads.push_back( std::move( t ) );
thread_count++;
}
for ( auto & t : threads ) t.join();
return 0;
}
我想在那个 for 循环中创建线程 objects/lamdas,然后让它们 运行 最多 8 个线程。
当前输出为:
I am Thread #: 0 with current thread count: 3
I am Thread #: 3 with current thread count: 4
I am Thread #: 2 with current thread count: 4
I am Thread #: 5 with current thread count: 3
I am Thread #: 1 with current thread count: 3
I am Thread #: 4 with current thread count: 5
I am Thread #: 6 with current thread count: 1
I am Thread #: 7 with current thread count: 1
I am Thread #: 8 with current thread count: 1
I am Thread #: 9 with current thread count: 1
I am Thread #: 10 with current thread count: 1
I am Thread #: 11 with current thread count: 1
I am Thread #: 12 with current thread count: 1
I am Thread #: 13 with current thread count: 1
I am Thread #: 14 with current thread count: 1
I am Thread #: 15 with current thread count: 1
I am Thread #: 16 with current thread count: 1
I am Thread #: 17 with current thread count: 1
I am Thread #: 18 with current thread count: 1
I am Thread #: 19 with current thread count: 1
这显然永远不会达到 8 个线程的最大数量。
你可以自己看看here
您正在创建的线程似乎比主循环创建新线程终止得更快。所以你没有看到 8 个并行线程在运行,而是更少。
请注意,程序输出将是不确定的,因为线程调度有些不可预测。如果调用频率足够高,您的程序实际上可能不时有 8 个并行线程 运行...
如果您更改 threadfunc lambda 以使其不会立即终止,您将看到线程堆积起来。您可以通过在 threadfunc lambda 中添加对 sleep
的一些调用来实现。
请注意,您的程序可能会挂起,因为主循环将等待条件变量。但是没有人发出信号,所以它可能会永远挂起。
因此您可能希望将 threadfunc lambda 末尾的 thread_count--;
行更改为:
std::unique_lock<std::mutex> lock ( mutex );
thread_count--;
condition.notify_one();
这不仅会减少线程数,还会向条件变量发出信号,以便主循环可以唤醒并在线程终止时恢复创建额外的线程。
完整的工作示例,每个线程随机延迟休眠(假装一些实际工作):
#include <stdio.h>
#include <mutex>
#include <thread>
#include <atomic>
#include <vector>
#include <algorithm>
#include <condition_variable>
#include <chrono>
#include <random>
int main()
{
std::vector<std::thread> threads;
std::condition_variable condition;
std::mutex mutex;
std::atomic<unsigned int> thread_count(0);
std::mt19937 generator;
for (int i = 0; i < 20; i++) {
std::unique_lock<std::mutex> lock (mutex);
condition.wait(lock, [&]{ return thread_count < 8; });
// random sleep delay
auto delay = generator() % 10000;
std::thread t = std::thread([&, i, delay]() {
printf("I am Thread #: %d with current thread count: %u\n", i, unsigned(thread_count));
// ok to access the generator here because
std::this_thread::sleep_for(std::chrono::milliseconds(delay));
printf("I am Thread #: %d and I am done\n", i);
// signal that this thread is done
std::unique_lock<std::mutex> lock(mutex);
thread_count--;
condition.notify_one();
});
threads.push_back(std::move(t));
thread_count++;
}
for (auto& t : threads) {
t.join();
}
return 0;
}