等到变量变为零
Wait until a variable becomes zero
我正在编写一个多线程程序,可以在单独的线程中执行一些任务。
有些操作需要在我的程序执行结束时等待它们。我已经为这样的 "important" 操作编写了简单的守卫:
class CPendingOperationGuard final
{
public:
CPendingOperationGuard()
{
InterlockedIncrementAcquire( &m_ullCounter );
}
~CPendingOperationGuard()
{
InterlockedDecrementAcquire( &m_ullCounter );
}
static bool WaitForAll( DWORD dwTimeOut )
{
// Here is a topic of my question
// Return false on timeout
// Return true if wait was successful
}
private:
static volatile ULONGLONG m_ullCounter;
};
用法很简单:
void ImportantTask()
{
CPendingOperationGuard guard;
// Do work
}
// ...
void StopExecution()
{
if(!CPendingOperationGuard::WaitForAll( 30000 )) {
// Handle error
}
}
问题是:如何有效地等到 m_ullCounter
变为零或直到超时。
我有两个想法:
要在另一个单独的线程中启动此函数并写入 WaitForSingleObject( hThread, dwTimeout )
:
DWORD WINAPI WaitWorker( LPVOID )
{
while(InterlockedCompareExchangeRelease( &m_ullCounter, 0, 0 ))
;
}
但它会 "eat" 几乎 100% 的 CPU 时间 - 坏主意。
第二种思路是让其他线程启动:
DWORD WINAPI WaitWorker( LPVOID )
{
while(InterlockedCompareExchangeRelease( &m_ullCounter, 0, 0 ))
Sleep( 0 );
}
但它会将执行上下文切换到内核模式并返回 - 在可能的任务中成本太高。也是个坏主意
问题是:
如何在我的变量变为零之前执行几乎为零的开销?也许没有单独的线程...主要条件是支持超时停止等待。
也许有人可以为我的任务提出完全不同的想法 - 等待所有已注册的操作(例如在 WinAPI 的 ThreadPools - 它的 API 有,例如, WaitForThreadpoolWaitCallbacks
执行等待所有注册的任务).
PS:无法使用 ThreadPool API 重写我的代码 :(
你需要像 Run-Down Protection 这样的任务来代替 CPendingOperationGuard
在开始操作之前,您调用 ExAcquireRundownProtection
and only if it return TRUE - begin execute operation. at the end you must call ExReleaseRundownProtection
所以模式必须是下一个
if (ExAcquireRundownProtection(&RunRef)) {
do_operation();
ExReleaseRundownProtection(&RunRef);
}
当您想停止此过程并等待所有活动调用 do_operation();
完成时 - 您调用 ExWaitForRundownProtectionRelease
(而不是 WaitWorker
)
调用 ExWaitForRundownProtectionRelease
后,ExAcquireRundownProtection
例程将 return FALSE(因此在此之后不会启动新的操作)。 ExWaitForRundownProtectionRelease
等待 return 直到所有调用 ExReleaseRundownProtection
例程以释放先前获得的 运行-down 保护(因此当所有当前(如果存在)操作完成时)。当所有未完成的访问都完成后,ExWaitForRundownProtectionRelease
returns
不幸的是,这个 api 仅在内核模式下由系统实现,在用户模式下没有模拟。然而自己并不难实现这样的想法
这是我的例子:
enum RundownState {
v_complete = 0, v_init = 0x80000000
};
template<typename T>
class RundownProtection
{
LONG _Value;
public:
_NODISCARD BOOL IsRundownBegin()
{
return 0 <= _Value;
}
_NODISCARD BOOL AcquireRP()
{
LONG Value, NewValue;
if (0 > (Value = _Value))
{
do
{
NewValue = InterlockedCompareExchangeNoFence(&_Value, Value + 1, Value);
if (NewValue == Value) return TRUE;
} while (0 > (Value = NewValue));
}
return FALSE;
}
void ReleaseRP()
{
if (InterlockedDecrement(&_Value) == v_complete)
{
static_cast<T*>(this)->RundownCompleted();
}
}
void Rundown_l()
{
InterlockedBitTestAndResetNoFence(&_Value, 31);
}
void Rundown()
{
if (AcquireRP())
{
Rundown_l();
ReleaseRP();
}
}
RundownProtection(RundownState Value = v_init) : _Value(Value)
{
}
void Init()
{
_Value = v_init;
}
};
///////////////////////////////////////////////////////////////
class OperationGuard : public RundownProtection<OperationGuard>
{
friend RundownProtection<OperationGuard>;
HANDLE _hEvent;
void RundownCompleted()
{
SetEvent(_hEvent);
}
public:
OperationGuard() : _hEvent(0) {}
~OperationGuard()
{
if (_hEvent)
{
CloseHandle(_hEvent);
}
}
ULONG WaitComplete(ULONG dwMilliseconds = INFINITE)
{
return WaitForSingleObject(_hEvent, dwMilliseconds);
}
ULONG Init()
{
return (_hEvent = CreateEvent(0, 0, 0, 0)) ? NOERROR : GetLastError();
}
} g_guard;
//////////////////////////////////////////////
ULONG CALLBACK PendingOperationThread(void*)
{
while (g_guard.AcquireRP())
{
Sleep(1000);// do operation
g_guard.ReleaseRP();
}
return 0;
}
void demo()
{
if (g_guard.Init() == NOERROR)
{
if (HANDLE hThread = CreateThread(0, 0, PendingOperationThread, 0, 0, 0))
{
CloseHandle(hThread);
}
MessageBoxW(0, 0, L"UI Thread", MB_ICONINFORMATION|MB_OK);
g_guard.Rundown();
g_guard.WaitComplete();
}
}
为什么要等到 m_ullCounter
变成零还不够
如果我们从 m_ullCounter
中读到 0,这意味着只有此时没有活动操作。但是挂起的操作已经可以开始 在 我们检查 m_ullCounter == 0
之后。我们可以使用特殊标志(比如 bool g_bQuit
)并设置它。开始前的操作检查此标志,如果为真则不开始。但这无论如何不够
原始代码:
//工作线程
if (!g_bQuit) // (1)
{
// MessageBoxW(0, 0, L"simulate delay", MB_ICONWARNING);
InterlockedIncrement(&g_ullCounter); // (4)
// do operation
InterlockedDecrement(&g_ullCounter); // (5)
}
// 在这里等待所有操作完成
g_bQuit = true; // (2)
// wait on g_ullCounter == 0, how - not important
while (g_ullCounter) continue; // (3)
- 等待操作检查 g_bQuit 标志 (1) - 它仍然是假的,所以它
开始
- 工作线程被交换(使用 MessageBox 模拟)
- 我们设置 g_bQuit = true; // (2)
- we check/wait for
g_ullCounter == 0
, it 0 所以我们退出 (3)
- 工作线程唤醒(return 来自 MessageBox)并递增
g_ullCounter
(4)
这里的问题是 operation 可以使用一些我们已经在 g_ullCounter == 0
之后开始销毁的资源
发生这种情况是因为检查退出标志 (g_Quit) 并在此之后递增计数器 不是原子的 - 可能是它们之间的差距。
为了获得正确的解决方案,我们需要对标志+计数器进行原子访问。这并进行 运行 向下保护。对于 flag+counter 使用单个 LONG 变量(32 位),因为我们可以对其进行原子访问。 31 位用于计数器,1 位用于退出标志。 windows 解决方案使用 0 位作为标志(1 表示退出)和 [1..31] 位作为计数器。我使用 [0..30] 位作为计数器,31 位作为标志(0 表示退出)。寻找
看看 WaitOnAddress()
and WakeByAddressSingle()
/WakeByAddressAll()
中介绍的功能 Windows 8.
例如:
class CPendingOperationGuard final
{
public:
CPendingOperationGuard()
{
InterlockedIncrementAcquire(&m_ullCounter);
WakeByAddressAll(&m_ullCounter);
}
~CPendingOperationGuard()
{
InterlockedDecrementAcquire(&m_ullCounter);
WakeByAddressAll(&m_ullCounter);
}
static bool WaitForAll( DWORD dwTimeOut )
{
ULONGLONG Captured, Now, Deadline = GetTickCount64() + dwTimeOut;
DWORD TimeRemaining;
do
{
Captured = InterlockedExchangeAdd64((LONG64 volatile *)&m_ullCounter, 0);
if (Captured == 0) return true;
Now = GetTickCount64();
if (Now >= Deadline) return false;
TimeRemaining = static_cast<DWORD>(Deadline - Now);
}
while (WaitOnAddress(&m_ullCounter, &Captured, sizeof(ULONGLONG), TimeRemaining));
return false;
}
private:
static volatile ULONGLONG m_ullCounter;
};
Raymond Chen 写了一系列关于这些功能的博客文章:
WaitOnAddress lets you create a synchronization object out of any data variable, even a byte
Implementing a critical section in terms of WaitOnAddress
Spurious wakes, race conditions, and bogus FIFO claims: A peek behind the curtain of WaitOnAddress
Extending our critical section based on WaitOnAddress to support timeouts
Comparing WaitOnAddress with futexes (futexi? futexen?)
Creating a semaphore from WaitOnAddress
Creating a semaphore with a maximum count from WaitOnAddress
Creating a manual-reset event from WaitOnAddress
Creating an automatic-reset event from WaitOnAddress
A helper template function to wait for WaitOnAddress in a loop
我正在编写一个多线程程序,可以在单独的线程中执行一些任务。
有些操作需要在我的程序执行结束时等待它们。我已经为这样的 "important" 操作编写了简单的守卫:
class CPendingOperationGuard final
{
public:
CPendingOperationGuard()
{
InterlockedIncrementAcquire( &m_ullCounter );
}
~CPendingOperationGuard()
{
InterlockedDecrementAcquire( &m_ullCounter );
}
static bool WaitForAll( DWORD dwTimeOut )
{
// Here is a topic of my question
// Return false on timeout
// Return true if wait was successful
}
private:
static volatile ULONGLONG m_ullCounter;
};
用法很简单:
void ImportantTask()
{
CPendingOperationGuard guard;
// Do work
}
// ...
void StopExecution()
{
if(!CPendingOperationGuard::WaitForAll( 30000 )) {
// Handle error
}
}
问题是:如何有效地等到 m_ullCounter
变为零或直到超时。
我有两个想法:
要在另一个单独的线程中启动此函数并写入
WaitForSingleObject( hThread, dwTimeout )
:DWORD WINAPI WaitWorker( LPVOID ) { while(InterlockedCompareExchangeRelease( &m_ullCounter, 0, 0 )) ; }
但它会 "eat" 几乎 100% 的 CPU 时间 - 坏主意。
第二种思路是让其他线程启动:
DWORD WINAPI WaitWorker( LPVOID ) { while(InterlockedCompareExchangeRelease( &m_ullCounter, 0, 0 )) Sleep( 0 ); }
但它会将执行上下文切换到内核模式并返回 - 在可能的任务中成本太高。也是个坏主意
问题是:
如何在我的变量变为零之前执行几乎为零的开销?也许没有单独的线程...主要条件是支持超时停止等待。
也许有人可以为我的任务提出完全不同的想法 - 等待所有已注册的操作(例如在 WinAPI 的 ThreadPools - 它的 API 有,例如, WaitForThreadpoolWaitCallbacks
执行等待所有注册的任务).
PS:无法使用 ThreadPool API 重写我的代码 :(
你需要像 Run-Down Protection 这样的任务来代替 CPendingOperationGuard
在开始操作之前,您调用 ExAcquireRundownProtection
and only if it return TRUE - begin execute operation. at the end you must call ExReleaseRundownProtection
所以模式必须是下一个
if (ExAcquireRundownProtection(&RunRef)) {
do_operation();
ExReleaseRundownProtection(&RunRef);
}
当您想停止此过程并等待所有活动调用 do_operation();
完成时 - 您调用 ExWaitForRundownProtectionRelease
(而不是 WaitWorker
)
调用 ExWaitForRundownProtectionRelease
后,ExAcquireRundownProtection
例程将 return FALSE(因此在此之后不会启动新的操作)。 ExWaitForRundownProtectionRelease
等待 return 直到所有调用 ExReleaseRundownProtection
例程以释放先前获得的 运行-down 保护(因此当所有当前(如果存在)操作完成时)。当所有未完成的访问都完成后,ExWaitForRundownProtectionRelease
returns
不幸的是,这个 api 仅在内核模式下由系统实现,在用户模式下没有模拟。然而自己并不难实现这样的想法
这是我的例子:
enum RundownState {
v_complete = 0, v_init = 0x80000000
};
template<typename T>
class RundownProtection
{
LONG _Value;
public:
_NODISCARD BOOL IsRundownBegin()
{
return 0 <= _Value;
}
_NODISCARD BOOL AcquireRP()
{
LONG Value, NewValue;
if (0 > (Value = _Value))
{
do
{
NewValue = InterlockedCompareExchangeNoFence(&_Value, Value + 1, Value);
if (NewValue == Value) return TRUE;
} while (0 > (Value = NewValue));
}
return FALSE;
}
void ReleaseRP()
{
if (InterlockedDecrement(&_Value) == v_complete)
{
static_cast<T*>(this)->RundownCompleted();
}
}
void Rundown_l()
{
InterlockedBitTestAndResetNoFence(&_Value, 31);
}
void Rundown()
{
if (AcquireRP())
{
Rundown_l();
ReleaseRP();
}
}
RundownProtection(RundownState Value = v_init) : _Value(Value)
{
}
void Init()
{
_Value = v_init;
}
};
///////////////////////////////////////////////////////////////
class OperationGuard : public RundownProtection<OperationGuard>
{
friend RundownProtection<OperationGuard>;
HANDLE _hEvent;
void RundownCompleted()
{
SetEvent(_hEvent);
}
public:
OperationGuard() : _hEvent(0) {}
~OperationGuard()
{
if (_hEvent)
{
CloseHandle(_hEvent);
}
}
ULONG WaitComplete(ULONG dwMilliseconds = INFINITE)
{
return WaitForSingleObject(_hEvent, dwMilliseconds);
}
ULONG Init()
{
return (_hEvent = CreateEvent(0, 0, 0, 0)) ? NOERROR : GetLastError();
}
} g_guard;
//////////////////////////////////////////////
ULONG CALLBACK PendingOperationThread(void*)
{
while (g_guard.AcquireRP())
{
Sleep(1000);// do operation
g_guard.ReleaseRP();
}
return 0;
}
void demo()
{
if (g_guard.Init() == NOERROR)
{
if (HANDLE hThread = CreateThread(0, 0, PendingOperationThread, 0, 0, 0))
{
CloseHandle(hThread);
}
MessageBoxW(0, 0, L"UI Thread", MB_ICONINFORMATION|MB_OK);
g_guard.Rundown();
g_guard.WaitComplete();
}
}
为什么要等到 m_ullCounter
变成零还不够
如果我们从 m_ullCounter
中读到 0,这意味着只有此时没有活动操作。但是挂起的操作已经可以开始 在 我们检查 m_ullCounter == 0
之后。我们可以使用特殊标志(比如 bool g_bQuit
)并设置它。开始前的操作检查此标志,如果为真则不开始。但这无论如何不够
原始代码:
//工作线程
if (!g_bQuit) // (1)
{
// MessageBoxW(0, 0, L"simulate delay", MB_ICONWARNING);
InterlockedIncrement(&g_ullCounter); // (4)
// do operation
InterlockedDecrement(&g_ullCounter); // (5)
}
// 在这里等待所有操作完成
g_bQuit = true; // (2)
// wait on g_ullCounter == 0, how - not important
while (g_ullCounter) continue; // (3)
- 等待操作检查 g_bQuit 标志 (1) - 它仍然是假的,所以它 开始
- 工作线程被交换(使用 MessageBox 模拟)
- 我们设置 g_bQuit = true; // (2)
- we check/wait for
g_ullCounter == 0
, it 0 所以我们退出 (3) - 工作线程唤醒(return 来自 MessageBox)并递增
g_ullCounter
(4)
这里的问题是 operation 可以使用一些我们已经在 g_ullCounter == 0
发生这种情况是因为检查退出标志 (g_Quit) 并在此之后递增计数器 不是原子的 - 可能是它们之间的差距。
为了获得正确的解决方案,我们需要对标志+计数器进行原子访问。这并进行 运行 向下保护。对于 flag+counter 使用单个 LONG 变量(32 位),因为我们可以对其进行原子访问。 31 位用于计数器,1 位用于退出标志。 windows 解决方案使用 0 位作为标志(1 表示退出)和 [1..31] 位作为计数器。我使用 [0..30] 位作为计数器,31 位作为标志(0 表示退出)。寻找
看看 WaitOnAddress()
and WakeByAddressSingle()
/WakeByAddressAll()
中介绍的功能 Windows 8.
例如:
class CPendingOperationGuard final
{
public:
CPendingOperationGuard()
{
InterlockedIncrementAcquire(&m_ullCounter);
WakeByAddressAll(&m_ullCounter);
}
~CPendingOperationGuard()
{
InterlockedDecrementAcquire(&m_ullCounter);
WakeByAddressAll(&m_ullCounter);
}
static bool WaitForAll( DWORD dwTimeOut )
{
ULONGLONG Captured, Now, Deadline = GetTickCount64() + dwTimeOut;
DWORD TimeRemaining;
do
{
Captured = InterlockedExchangeAdd64((LONG64 volatile *)&m_ullCounter, 0);
if (Captured == 0) return true;
Now = GetTickCount64();
if (Now >= Deadline) return false;
TimeRemaining = static_cast<DWORD>(Deadline - Now);
}
while (WaitOnAddress(&m_ullCounter, &Captured, sizeof(ULONGLONG), TimeRemaining));
return false;
}
private:
static volatile ULONGLONG m_ullCounter;
};
Raymond Chen 写了一系列关于这些功能的博客文章:
WaitOnAddress lets you create a synchronization object out of any data variable, even a byte
Implementing a critical section in terms of WaitOnAddress
Spurious wakes, race conditions, and bogus FIFO claims: A peek behind the curtain of WaitOnAddress
Extending our critical section based on WaitOnAddress to support timeouts
Comparing WaitOnAddress with futexes (futexi? futexen?)
Creating a semaphore from WaitOnAddress
Creating a semaphore with a maximum count from WaitOnAddress
Creating a manual-reset event from WaitOnAddress
Creating an automatic-reset event from WaitOnAddress
A helper template function to wait for WaitOnAddress in a loop