C++中的线程队列
Threading queue in c++
目前正在做一个项目,目前正在为线程和队列而苦苦挣扎,问题是所有线程都在队列中获取相同的项目。
可重现的例子:
#include <iostream>
#include <queue>
#include <thread>
using namespace std;
void Test(queue<string> queue){
while (!queue.empty()) {
string proxy = queue.front();
cout << proxy << "\n";
queue.pop();
}
}
int main()
{
queue<string> queue;
queue.push("101.132.186.39:9090");
queue.push("95.85.24.83:8118");
queue.push("185.211.193.162:8080");
queue.push("87.106.37.89:8888");
queue.push("159.203.61.169:8080");
std::vector<std::thread> ThreadVector;
for (int i = 0; i <= 10; i++){
ThreadVector.emplace_back([&]() {Test(queue); });
}
for (auto& t : ThreadVector){
t.join();
}
ThreadVector.clear();
return 0;
}
您为每个线程提供了自己的队列副本。我想你想要的是所有线程都在同一个队列上工作,为此当多个线程在 shared 队列上工作时你需要使用一些同步机制,因为 std 队列不是线程安全。
编辑:小提示:在你的代码中你生成了 11 个线程而不是 10 个。
编辑 2:好的,尝试从以下开始:
std::mutex lock_work;
std::mutex lock_io;
void Test(queue<string>& queue){
while (!queue.empty()) {
string proxy;
{
std::lock_guard<std::mutex> lock(lock_work);
proxy = queue.front();
queue.pop();
}
{
std::lock_guard<std::mutex> lock(lock_io);
cout << proxy << "\n";
}
}
}
看看这个片段:
void Test(std::queue<std::string> queue) { /* ... */ }
此处将队列对象的副本传递给线程。
这个副本对于每个线程都是本地的,所以它会在每个线程退出后被销毁,所以最终你的程序不会对驻留在 main()
中的实际 queue
对象产生任何影响功能。
要解决此问题,您需要使参数采用引用或指针:
void Test(std::queue<std::string>& queue) { /* ... */ }
这使得参数直接引用存在于 main()
中的 queue
对象,而不是创建副本。
现在,上面的代码仍然不正确,因为 queue
容易出现 data-race and neither std::queue
nor std::cout
is thread-safe and can get interrupted by another thread while currently being accessed by one. To prevent this, use a std::mutex
:
// ...
#include <mutex>
// ...
// The mutex protects the 'queue' object from being subjected to data-race amongst different threads
// Additionally 'io_mut' is used to protect the streaming operations done with 'std::cout'
std::mutex mut, io_mut;
void Test(std::queue<std::string>& queue) {
std::queue<std::string> tmp;
{
// Swap the actual object with a local temporary object while being protected by the mutex
std::lock_guard<std::mutex> lock(mut);
std::swap(tmp, queue);
}
while (!tmp.empty()) {
std::string proxy = tmp.front();
{
// Call to 'std::cout' needs to be synchronized
std::lock_guard<std::mutex> lock(io_mut);
std::cout << proxy << "\n";
}
tmp.pop();
}
}
这会同步每个线程调用并防止在 queue
仍在被线程访问时从任何其他线程访问。
编辑:
或者,我认为让每个线程等待其中一个线程收到您推送到 std::queue
的通知会更快。您可以通过使用 std::condition_variable
:
来做到这一点
// ...
#include <mutex>
#include <condition_variable>
// ...
std::mutex mut1, mut2;
std::condition_variable cond;
void Test(std::queue<std::string>& queue, std::chrono::milliseconds timeout = std::chrono::milliseconds{10}) {
std::unique_lock<std::mutex> lock(mut1);
// Wait until 'queue' is not empty...
cond.wait(lock, [queue] { return queue.empty(); });
while (!queue.empty()) {
std::string proxy = std::move(queue.front());
std::cout << proxy << "\n";
queue.pop();
}
}
// ...
int main() {
std::queue<string> queue;
std::vector<std::thread> ThreadVector;
for (int i = 0; i <= 10; i++)
ThreadVector.emplace_back([&]() { Test(queue); });
// Notify the vectors of each 'push()' call to 'queue'
{
std::unique_lock<std::mutex> lock(mut2);
queue.push("101.132.186.39:9090");
cond.notify_one();
}
{
std::unique_lock<std::mutex> lock(mut2);
queue.push("95.85.24.83:8118");
cond.notify_one();
}
{
std::unique_lock<std::mutex> lock(mut2);
queue.push("185.211.193.162:8080");
cond.notify_one();
}
{
std::unique_lock<std::mutex> lock(mut2);
queue.push("87.106.37.89:8888");
cond.notify_one();
}
{
std::unique_lock<std::mutex> lock(mut2);
queue.push("159.203.61.169:8080");
cond.notify_one();
}
for (auto& t : ThreadVector)
t.join();
ThreadVector.clear();
}
目前正在做一个项目,目前正在为线程和队列而苦苦挣扎,问题是所有线程都在队列中获取相同的项目。
可重现的例子:
#include <iostream>
#include <queue>
#include <thread>
using namespace std;
void Test(queue<string> queue){
while (!queue.empty()) {
string proxy = queue.front();
cout << proxy << "\n";
queue.pop();
}
}
int main()
{
queue<string> queue;
queue.push("101.132.186.39:9090");
queue.push("95.85.24.83:8118");
queue.push("185.211.193.162:8080");
queue.push("87.106.37.89:8888");
queue.push("159.203.61.169:8080");
std::vector<std::thread> ThreadVector;
for (int i = 0; i <= 10; i++){
ThreadVector.emplace_back([&]() {Test(queue); });
}
for (auto& t : ThreadVector){
t.join();
}
ThreadVector.clear();
return 0;
}
您为每个线程提供了自己的队列副本。我想你想要的是所有线程都在同一个队列上工作,为此当多个线程在 shared 队列上工作时你需要使用一些同步机制,因为 std 队列不是线程安全。
编辑:小提示:在你的代码中你生成了 11 个线程而不是 10 个。
编辑 2:好的,尝试从以下开始:
std::mutex lock_work;
std::mutex lock_io;
void Test(queue<string>& queue){
while (!queue.empty()) {
string proxy;
{
std::lock_guard<std::mutex> lock(lock_work);
proxy = queue.front();
queue.pop();
}
{
std::lock_guard<std::mutex> lock(lock_io);
cout << proxy << "\n";
}
}
}
看看这个片段:
void Test(std::queue<std::string> queue) { /* ... */ }
此处将队列对象的副本传递给线程。
这个副本对于每个线程都是本地的,所以它会在每个线程退出后被销毁,所以最终你的程序不会对驻留在 main()
中的实际 queue
对象产生任何影响功能。
要解决此问题,您需要使参数采用引用或指针:
void Test(std::queue<std::string>& queue) { /* ... */ }
这使得参数直接引用存在于 main()
中的 queue
对象,而不是创建副本。
现在,上面的代码仍然不正确,因为 queue
容易出现 data-race and neither std::queue
nor std::cout
is thread-safe and can get interrupted by another thread while currently being accessed by one. To prevent this, use a std::mutex
:
// ...
#include <mutex>
// ...
// The mutex protects the 'queue' object from being subjected to data-race amongst different threads
// Additionally 'io_mut' is used to protect the streaming operations done with 'std::cout'
std::mutex mut, io_mut;
void Test(std::queue<std::string>& queue) {
std::queue<std::string> tmp;
{
// Swap the actual object with a local temporary object while being protected by the mutex
std::lock_guard<std::mutex> lock(mut);
std::swap(tmp, queue);
}
while (!tmp.empty()) {
std::string proxy = tmp.front();
{
// Call to 'std::cout' needs to be synchronized
std::lock_guard<std::mutex> lock(io_mut);
std::cout << proxy << "\n";
}
tmp.pop();
}
}
这会同步每个线程调用并防止在 queue
仍在被线程访问时从任何其他线程访问。
编辑:
或者,我认为让每个线程等待其中一个线程收到您推送到 std::queue
的通知会更快。您可以通过使用 std::condition_variable
:
// ...
#include <mutex>
#include <condition_variable>
// ...
std::mutex mut1, mut2;
std::condition_variable cond;
void Test(std::queue<std::string>& queue, std::chrono::milliseconds timeout = std::chrono::milliseconds{10}) {
std::unique_lock<std::mutex> lock(mut1);
// Wait until 'queue' is not empty...
cond.wait(lock, [queue] { return queue.empty(); });
while (!queue.empty()) {
std::string proxy = std::move(queue.front());
std::cout << proxy << "\n";
queue.pop();
}
}
// ...
int main() {
std::queue<string> queue;
std::vector<std::thread> ThreadVector;
for (int i = 0; i <= 10; i++)
ThreadVector.emplace_back([&]() { Test(queue); });
// Notify the vectors of each 'push()' call to 'queue'
{
std::unique_lock<std::mutex> lock(mut2);
queue.push("101.132.186.39:9090");
cond.notify_one();
}
{
std::unique_lock<std::mutex> lock(mut2);
queue.push("95.85.24.83:8118");
cond.notify_one();
}
{
std::unique_lock<std::mutex> lock(mut2);
queue.push("185.211.193.162:8080");
cond.notify_one();
}
{
std::unique_lock<std::mutex> lock(mut2);
queue.push("87.106.37.89:8888");
cond.notify_one();
}
{
std::unique_lock<std::mutex> lock(mut2);
queue.push("159.203.61.169:8080");
cond.notify_one();
}
for (auto& t : ThreadVector)
t.join();
ThreadVector.clear();
}