工作线程挂起/恢复实现
Worker thread suspend / resume implementation
在尝试向我的 Worker [线程] class 添加暂停/恢复功能时,我遇到了一个我无法解释的问题。 (C++1y / VS2015)
这个问题看起来像是一个死锁,但是一旦附加了调试器并且在某个点之前设置了断点(请参阅#1),我似乎无法重现它 - 所以它看起来像是一个时间问题。
我能找到的修复程序 (#2) 对我来说意义不大,因为它需要更长时间地保留互斥锁,并且客户端代码可能会尝试获取其他互斥锁,我理解这一点实际上增加了死锁的机会。
但它确实解决了这个问题。
Worker 循环:
Job* job;
while (true)
{
{
std::unique_lock<std::mutex> lock(m_jobsMutex);
m_workSemaphore.Wait(lock);
if (m_jobs.empty() && m_finishing)
{
break;
}
// Take the next job
ASSERT(!m_jobs.empty());
job = m_jobs.front();
m_jobs.pop_front();
}
bool done = false;
bool wasSuspended = false;
do
{
// #2
{ // Removing this extra scoping seemingly fixes the issue BUT
// incurs us holding on to m_suspendMutex while the job is Process()ing,
// which might 1, be lengthy, 2, acquire other locks.
std::unique_lock<std::mutex> lock(m_suspendMutex);
if (m_isSuspended && !wasSuspended)
{
job->Suspend();
}
wasSuspended = m_isSuspended;
m_suspendCv.wait(lock, [this] {
return !m_isSuspended;
});
if (wasSuspended && !m_isSuspended)
{
job->Resume();
}
wasSuspended = m_isSuspended;
}
done = job->Process();
}
while (!done);
}
暂停/恢复只是:
void Worker::Suspend()
{
std::unique_lock<std::mutex> lock(m_suspendMutex);
ASSERT(!m_isSuspended);
m_isSuspended = true;
}
void Worker::Resume()
{
{
std::unique_lock<std::mutex> lock(m_suspendMutex);
ASSERT(m_isSuspended);
m_isSuspended = false;
}
m_suspendCv.notify_one(); // notify_all() doesn't work either.
}
(Visual Studio) 测试:
struct Job: Worker::Job
{
int durationMs = 25;
int chunks = 40;
int executed = 0;
bool Process()
{
auto now = std::chrono::system_clock::now();
auto until = now + std::chrono::milliseconds(durationMs);
while (std::chrono::system_clock::now() < until)
{ /* busy, busy */
}
++executed;
return executed < chunks;
}
void Suspend() { /* nothing here */ }
void Resume() { /* nothing here */ }
};
auto worker = std::make_unique<Worker>();
Job j;
worker->Enqueue(j);
std::this_thread::sleep_for(std::chrono::milliseconds(j.durationMs)); // Wait at least one chunk.
worker->Suspend();
Assert::IsTrue(j.executed < j.chunks); // We've suspended before we finished.
const int testExec = j.executed;
std::this_thread::sleep_for(std::chrono::milliseconds(j.durationMs * 4));
Assert::IsTrue(j.executed == testExec); // We haven't moved on.
// #1
worker->Resume(); // Breaking before this call means that I won't see the issue.
worker->Finalize();
Assert::IsTrue(j.executed == j.chunks); // Now we've finished.
我错过了什么/做错了什么?为什么作业的 Process()ing 必须由 suspend
互斥锁保护?
编辑:Resume()
在通知时不应该持有互斥体;已解决 -- 问题仍然存在。
当然,作业的 Process()
ing 不必由 suspend
互斥锁保护。
j.executed
的访问 - 对于断言和递增 - 但是确实需要同步(通过使其成为 std::atomic<int>
或通过互斥锁等来保护它。 ).
目前还不清楚为什么问题会以这种方式出现(因为我没有在主线程上写入变量)——可能是 undefined behaviour propagating backwards in time.
的情况
在尝试向我的 Worker [线程] class 添加暂停/恢复功能时,我遇到了一个我无法解释的问题。 (C++1y / VS2015)
这个问题看起来像是一个死锁,但是一旦附加了调试器并且在某个点之前设置了断点(请参阅#1),我似乎无法重现它 - 所以它看起来像是一个时间问题。
我能找到的修复程序 (#2) 对我来说意义不大,因为它需要更长时间地保留互斥锁,并且客户端代码可能会尝试获取其他互斥锁,我理解这一点实际上增加了死锁的机会。
但它确实解决了这个问题。
Worker 循环:
Job* job;
while (true)
{
{
std::unique_lock<std::mutex> lock(m_jobsMutex);
m_workSemaphore.Wait(lock);
if (m_jobs.empty() && m_finishing)
{
break;
}
// Take the next job
ASSERT(!m_jobs.empty());
job = m_jobs.front();
m_jobs.pop_front();
}
bool done = false;
bool wasSuspended = false;
do
{
// #2
{ // Removing this extra scoping seemingly fixes the issue BUT
// incurs us holding on to m_suspendMutex while the job is Process()ing,
// which might 1, be lengthy, 2, acquire other locks.
std::unique_lock<std::mutex> lock(m_suspendMutex);
if (m_isSuspended && !wasSuspended)
{
job->Suspend();
}
wasSuspended = m_isSuspended;
m_suspendCv.wait(lock, [this] {
return !m_isSuspended;
});
if (wasSuspended && !m_isSuspended)
{
job->Resume();
}
wasSuspended = m_isSuspended;
}
done = job->Process();
}
while (!done);
}
暂停/恢复只是:
void Worker::Suspend()
{
std::unique_lock<std::mutex> lock(m_suspendMutex);
ASSERT(!m_isSuspended);
m_isSuspended = true;
}
void Worker::Resume()
{
{
std::unique_lock<std::mutex> lock(m_suspendMutex);
ASSERT(m_isSuspended);
m_isSuspended = false;
}
m_suspendCv.notify_one(); // notify_all() doesn't work either.
}
(Visual Studio) 测试:
struct Job: Worker::Job
{
int durationMs = 25;
int chunks = 40;
int executed = 0;
bool Process()
{
auto now = std::chrono::system_clock::now();
auto until = now + std::chrono::milliseconds(durationMs);
while (std::chrono::system_clock::now() < until)
{ /* busy, busy */
}
++executed;
return executed < chunks;
}
void Suspend() { /* nothing here */ }
void Resume() { /* nothing here */ }
};
auto worker = std::make_unique<Worker>();
Job j;
worker->Enqueue(j);
std::this_thread::sleep_for(std::chrono::milliseconds(j.durationMs)); // Wait at least one chunk.
worker->Suspend();
Assert::IsTrue(j.executed < j.chunks); // We've suspended before we finished.
const int testExec = j.executed;
std::this_thread::sleep_for(std::chrono::milliseconds(j.durationMs * 4));
Assert::IsTrue(j.executed == testExec); // We haven't moved on.
// #1
worker->Resume(); // Breaking before this call means that I won't see the issue.
worker->Finalize();
Assert::IsTrue(j.executed == j.chunks); // Now we've finished.
我错过了什么/做错了什么?为什么作业的 Process()ing 必须由 suspend
互斥锁保护?
编辑:Resume()
在通知时不应该持有互斥体;已解决 -- 问题仍然存在。
当然,作业的 Process()
ing 不必由 suspend
互斥锁保护。
j.executed
的访问 - 对于断言和递增 - 但是确实需要同步(通过使其成为 std::atomic<int>
或通过互斥锁等来保护它。 ).
目前还不清楚为什么问题会以这种方式出现(因为我没有在主线程上写入变量)——可能是 undefined behaviour propagating backwards in time.
的情况