std::future get() 在 wait_for() 状态准备就绪时阻塞,而 wait() returns
std::future get() blocks when wait_for() status is ready and wait() returns
我希望下面的代码每次都能通过所有断言并成功完成。目前似乎 std::future.get()
每次都在两个分支中阻塞。尽管 wait_for()
显示状态为 ready
并且 wait()
立即返回,但它会永远阻塞。 gcc 7.4.0 和 clang 6.0.0 的结果相同。
#include <chrono>
#include <condition_variable>
#include <future>
#include <functional>
#include <iostream>
#include <mutex>
#include <queue>
#include <cassert>
#include <unistd.h>
template<class T>
class BlockingQueue {
std::queue<T> theQueue;
std::mutex mtx;
std::condition_variable hasDataCondition;
public:
void push(const T& t) {
std::unique_lock<std::mutex> lock{mtx};
theQueue.push(t);
hasDataCondition.notify_all();
}
T popWhenAvailable(int i = 0) {
std::unique_lock<std::mutex> lock{mtx};
if (theQueue.empty()) {
std::cout << "Waiting " << i << std::endl;
hasDataCondition.wait(lock, [this]{return not theQueue.empty();});
std::cout << "Done waiting " << i << std::endl;
}
T front = std::move(theQueue.front());
theQueue.pop();
std::cout << "Got value " << front << " and popped it on " << i << std::endl;
return front;
}
};
int main(int argc, char** argv) {
BlockingQueue<int> q;
auto futureInt0 = std::async(std::launch::async, [&]{return q.popWhenAvailable();});
auto futureInt1 = std::async(std::launch::async, [&]{return q.popWhenAvailable(1);});
std::cout << "Starting threads..." << std::endl;
sleep(2);
assert(futureInt0.wait_for(std::chrono::milliseconds(300)) != std::future_status::ready);
assert(futureInt1.wait_for(std::chrono::milliseconds(300)) != std::future_status::ready);
std::cout << "Pushing data..." << std::endl;
q.push(4);
std::cout << "Pushed! Checking results..." << std::endl;
if (futureInt0.wait_for(std::chrono::milliseconds(300)) == std::future_status::ready) {
std::cout << "Future 0 ready." << std::endl;
assert(futureInt1.wait_for(std::chrono::milliseconds(300)) != std::future_status::ready);
std::cout << "Future 1 isn't ready (it shouldn't be)." << std::endl;
std::cout << "Trying to wait() for future 0, should return immediately..." << std::endl;
futureInt0.wait();
std::cout << "Now get() the value..." << std::endl;
assert(futureInt0.get() == 4);
} else {
std::cout << "Future 0 not ready. Trying future 1..." << std::endl;
assert(futureInt1.wait_for(std::chrono::milliseconds(300)) == std::future_status::ready);
std::cout << "Future1 status is ready. Trying to wait(), should return immediately..." << std::endl;
futureInt1.wait();
std::cout << "Now get() the value..." << std::endl;
assert(futureInt1.get() == 4);
}
}
有意思!我发现的第一件事是,正如 @rafix07 所指出的那样,您正在等待第二个线程弹出一些东西。我不确定最终的 objective 是什么,但这可行。我在 MSVC 上测试过,这里是 g++ on Coliru
#include <chrono>
#include <condition_variable>
#include <future>
#include <functional>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <cassert>
template<class T>
class BlockingQueue {
std::queue<T> theQueue;
std::mutex mtx;
std::condition_variable hasDataCondition;
public:
void push(const T& t) {
std::unique_lock<std::mutex> lock{ mtx };
theQueue.push(t);
hasDataCondition.notify_all();
}
T popWhenAvailable(int i) {
std::unique_lock<std::mutex> lock{ mtx };
std::cout << "popWhenAvailable: " << i << std::endl;
if (theQueue.empty()) {
std::cout << "Waiting " << i << std::endl;
hasDataCondition.wait(lock, [this] {return ! theQueue.empty(); });
std::cout << "Done waiting " << i << std::endl;
}
T front = std::move(theQueue.front());
theQueue.pop();
std::cout << "Got value " << front << " and popped it on " << i << std::endl;
return front;
}
};
int main(int argc, char** argv) {
using namespace std::chrono_literals;
BlockingQueue<int> q;
auto futureInt0 = std::async(std::launch::async, [&] {return q.popWhenAvailable(0); });
auto futureInt1 = std::async(std::launch::async, [&] {return q.popWhenAvailable(1); });
std::cout << "Starting threads...\n" << std::endl;
std::this_thread::sleep_for(1000ms);
assert(futureInt0.wait_for(std::chrono::milliseconds(300)) != std::future_status::ready);
assert(futureInt1.wait_for(std::chrono::milliseconds(300)) != std::future_status::ready);
std::cout << "Pushing data..." << std::endl;
q.push(4);
std::cout << "Pushed! Checking results..." << std::endl;
std::pair<bool, bool> done = { false,false };
for (;;) {
if (!done.first && futureInt0.wait_for(std::chrono::milliseconds(300)) == std::future_status::ready) {
std::cout << "Future 0 ready." << std::endl;
futureInt0.wait();
std::cout << "Now get() the value 0: " << futureInt0.get() << std::endl;
done.first = true;
}
else if(!done.second && futureInt1.wait_for(std::chrono::milliseconds(300)) == std::future_status::ready) {
std::cout << "Future 1 ready." << std::endl;
futureInt1.wait();
std::cout << "Now get() the value 1: " << futureInt1.get() << std::endl;
done.second = true;
}
if (done.first && done.second)
break;
else if(done.first || done.second)
q.push(8);
}
}
我希望下面的代码每次都能通过所有断言并成功完成。目前似乎 std::future.get()
每次都在两个分支中阻塞。尽管 wait_for()
显示状态为 ready
并且 wait()
立即返回,但它会永远阻塞。 gcc 7.4.0 和 clang 6.0.0 的结果相同。
#include <chrono>
#include <condition_variable>
#include <future>
#include <functional>
#include <iostream>
#include <mutex>
#include <queue>
#include <cassert>
#include <unistd.h>
template<class T>
class BlockingQueue {
std::queue<T> theQueue;
std::mutex mtx;
std::condition_variable hasDataCondition;
public:
void push(const T& t) {
std::unique_lock<std::mutex> lock{mtx};
theQueue.push(t);
hasDataCondition.notify_all();
}
T popWhenAvailable(int i = 0) {
std::unique_lock<std::mutex> lock{mtx};
if (theQueue.empty()) {
std::cout << "Waiting " << i << std::endl;
hasDataCondition.wait(lock, [this]{return not theQueue.empty();});
std::cout << "Done waiting " << i << std::endl;
}
T front = std::move(theQueue.front());
theQueue.pop();
std::cout << "Got value " << front << " and popped it on " << i << std::endl;
return front;
}
};
int main(int argc, char** argv) {
BlockingQueue<int> q;
auto futureInt0 = std::async(std::launch::async, [&]{return q.popWhenAvailable();});
auto futureInt1 = std::async(std::launch::async, [&]{return q.popWhenAvailable(1);});
std::cout << "Starting threads..." << std::endl;
sleep(2);
assert(futureInt0.wait_for(std::chrono::milliseconds(300)) != std::future_status::ready);
assert(futureInt1.wait_for(std::chrono::milliseconds(300)) != std::future_status::ready);
std::cout << "Pushing data..." << std::endl;
q.push(4);
std::cout << "Pushed! Checking results..." << std::endl;
if (futureInt0.wait_for(std::chrono::milliseconds(300)) == std::future_status::ready) {
std::cout << "Future 0 ready." << std::endl;
assert(futureInt1.wait_for(std::chrono::milliseconds(300)) != std::future_status::ready);
std::cout << "Future 1 isn't ready (it shouldn't be)." << std::endl;
std::cout << "Trying to wait() for future 0, should return immediately..." << std::endl;
futureInt0.wait();
std::cout << "Now get() the value..." << std::endl;
assert(futureInt0.get() == 4);
} else {
std::cout << "Future 0 not ready. Trying future 1..." << std::endl;
assert(futureInt1.wait_for(std::chrono::milliseconds(300)) == std::future_status::ready);
std::cout << "Future1 status is ready. Trying to wait(), should return immediately..." << std::endl;
futureInt1.wait();
std::cout << "Now get() the value..." << std::endl;
assert(futureInt1.get() == 4);
}
}
有意思!我发现的第一件事是,正如 @rafix07 所指出的那样,您正在等待第二个线程弹出一些东西。我不确定最终的 objective 是什么,但这可行。我在 MSVC 上测试过,这里是 g++ on Coliru
#include <chrono>
#include <condition_variable>
#include <future>
#include <functional>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <cassert>
template<class T>
class BlockingQueue {
std::queue<T> theQueue;
std::mutex mtx;
std::condition_variable hasDataCondition;
public:
void push(const T& t) {
std::unique_lock<std::mutex> lock{ mtx };
theQueue.push(t);
hasDataCondition.notify_all();
}
T popWhenAvailable(int i) {
std::unique_lock<std::mutex> lock{ mtx };
std::cout << "popWhenAvailable: " << i << std::endl;
if (theQueue.empty()) {
std::cout << "Waiting " << i << std::endl;
hasDataCondition.wait(lock, [this] {return ! theQueue.empty(); });
std::cout << "Done waiting " << i << std::endl;
}
T front = std::move(theQueue.front());
theQueue.pop();
std::cout << "Got value " << front << " and popped it on " << i << std::endl;
return front;
}
};
int main(int argc, char** argv) {
using namespace std::chrono_literals;
BlockingQueue<int> q;
auto futureInt0 = std::async(std::launch::async, [&] {return q.popWhenAvailable(0); });
auto futureInt1 = std::async(std::launch::async, [&] {return q.popWhenAvailable(1); });
std::cout << "Starting threads...\n" << std::endl;
std::this_thread::sleep_for(1000ms);
assert(futureInt0.wait_for(std::chrono::milliseconds(300)) != std::future_status::ready);
assert(futureInt1.wait_for(std::chrono::milliseconds(300)) != std::future_status::ready);
std::cout << "Pushing data..." << std::endl;
q.push(4);
std::cout << "Pushed! Checking results..." << std::endl;
std::pair<bool, bool> done = { false,false };
for (;;) {
if (!done.first && futureInt0.wait_for(std::chrono::milliseconds(300)) == std::future_status::ready) {
std::cout << "Future 0 ready." << std::endl;
futureInt0.wait();
std::cout << "Now get() the value 0: " << futureInt0.get() << std::endl;
done.first = true;
}
else if(!done.second && futureInt1.wait_for(std::chrono::milliseconds(300)) == std::future_status::ready) {
std::cout << "Future 1 ready." << std::endl;
futureInt1.wait();
std::cout << "Now get() the value 1: " << futureInt1.get() << std::endl;
done.second = true;
}
if (done.first && done.second)
break;
else if(done.first || done.second)
q.push(8);
}
}