等到变量变为零

Wait until a variable becomes zero

我正在编写一个多线程程序,可以在单独的线程中执行一些任务。

有些操作需要在我的程序执行结束时等待它们。我已经为这样的 "important" 操作编写了简单的守卫:

class CPendingOperationGuard final
{
public: 
    CPendingOperationGuard()
    {
        InterlockedIncrementAcquire( &m_ullCounter );
    }

    ~CPendingOperationGuard()
    {
        InterlockedDecrementAcquire( &m_ullCounter );
    }

    static bool WaitForAll( DWORD dwTimeOut )
    {
        // Here is a topic of my question
        // Return false on timeout
        // Return true if wait was successful
    }

private:
    static volatile ULONGLONG m_ullCounter;
};

用法很简单:

void ImportantTask()
{
    CPendingOperationGuard guard;
    // Do work
}

// ...

void StopExecution()
{
    if(!CPendingOperationGuard::WaitForAll( 30000 )) {
        // Handle error
    }
}

问题是:如何有效地等到 m_ullCounter 变为零或直到超时。

我有两个想法:

  1. 要在另一个单独的线程中启动此函数并写入 WaitForSingleObject( hThread, dwTimeout ):

    DWORD WINAPI WaitWorker( LPVOID )
    {
        while(InterlockedCompareExchangeRelease( &m_ullCounter, 0, 0 ))
            ;
    }
    

    但它会 "eat" 几乎 100% 的 CPU 时间 - 坏主意。

  2. 第二种思路是让其他线程启动:

    DWORD WINAPI WaitWorker( LPVOID )
    {
        while(InterlockedCompareExchangeRelease( &m_ullCounter, 0, 0 ))
            Sleep( 0 );
    }
    

    但它会将执行上下文切换到内核模式并返回 - 在可能的任务中成本太高。也是个坏主意

问题是:
如何在我的变量变为零之前执行几乎为零的开销?也许没有单独的线程...主要条件是支持超时停止等待。

也许有人可以为我的任务提出完全不同的想法 - 等待所有已注册的操作(例如在 WinAPI 的 ThreadPools - 它的 API 有,例如, WaitForThreadpoolWaitCallbacks 执行等待所有注册的任务).

PS:无法使用 ThreadPool API 重写我的代码 :(

你需要像 Run-Down Protection 这样的任务来代替 CPendingOperationGuard

在开始操作之前,您调用 ExAcquireRundownProtection and only if it return TRUE - begin execute operation. at the end you must call ExReleaseRundownProtection

所以模式必须是下一个

if (ExAcquireRundownProtection(&RunRef)) {
    do_operation();
    ExReleaseRundownProtection(&RunRef);
}

当您想停止此过程并等待所有活动调用 do_operation(); 完成时 - 您调用 ExWaitForRundownProtectionRelease(而不是 WaitWorker

调用 ExWaitForRundownProtectionRelease 后,ExAcquireRundownProtection 例程将 return FALSE(因此在此之后不会启动新的操作)。 ExWaitForRundownProtectionRelease 等待 return 直到所有调用 ExReleaseRundownProtection 例程以释放先前获得的 运行-down 保护(因此当所有当前(如果存在)操作完成时)。当所有未完成的访问都完成后,ExWaitForRundownProtectionRelease returns

不幸的是,这个 api 仅在内核模式下由系统实现,在用户模式下没有模拟。然而自己并不难实现这样的想法

这是我的例子:

enum RundownState {
    v_complete = 0, v_init = 0x80000000
};

template<typename T>
class RundownProtection
{
    LONG _Value;

public:

    _NODISCARD BOOL IsRundownBegin()
    {
        return 0 <= _Value;
    }

    _NODISCARD BOOL AcquireRP()
    {
        LONG Value, NewValue;

        if (0 > (Value = _Value))
        {
            do 
            {
                NewValue = InterlockedCompareExchangeNoFence(&_Value, Value + 1, Value);

                if (NewValue == Value) return TRUE;

            } while (0 > (Value = NewValue));
        }

        return FALSE;
    }

    void ReleaseRP()
    {
        if (InterlockedDecrement(&_Value) == v_complete)
        {
            static_cast<T*>(this)->RundownCompleted();
        }
    }

    void Rundown_l()
    {
        InterlockedBitTestAndResetNoFence(&_Value, 31);
    }

    void Rundown()
    {
        if (AcquireRP())
        {
            Rundown_l();
            ReleaseRP();
        }
    }

    RundownProtection(RundownState Value = v_init) : _Value(Value)
    {
    }

    void Init()
    {
        _Value = v_init;
    }
};

///////////////////////////////////////////////////////////////

class OperationGuard : public RundownProtection<OperationGuard>
{
    friend RundownProtection<OperationGuard>;

    HANDLE _hEvent;

    void RundownCompleted()
    {
        SetEvent(_hEvent);
    }

public:

    OperationGuard() : _hEvent(0) {}

    ~OperationGuard() 
    {
        if (_hEvent)
        {
            CloseHandle(_hEvent);
        }
    }

    ULONG WaitComplete(ULONG dwMilliseconds = INFINITE)
    {
        return WaitForSingleObject(_hEvent, dwMilliseconds);
    }

    ULONG Init()
    {
        return (_hEvent = CreateEvent(0, 0, 0, 0)) ? NOERROR : GetLastError();
    }
} g_guard;

//////////////////////////////////////////////

ULONG CALLBACK PendingOperationThread(void*)
{
    while (g_guard.AcquireRP())
    {
        Sleep(1000);// do operation
        g_guard.ReleaseRP();
    }

    return 0;
}

void demo()
{
    if (g_guard.Init() == NOERROR)
    {
        if (HANDLE hThread = CreateThread(0, 0, PendingOperationThread, 0, 0, 0))
        {
            CloseHandle(hThread);
        }

        MessageBoxW(0, 0, L"UI Thread", MB_ICONINFORMATION|MB_OK);

        g_guard.Rundown();

        g_guard.WaitComplete();
    }
}

为什么要等到 m_ullCounter 变成零还不够

如果我们从 m_ullCounter 中读到 0,这意味着只有此时没有活动操作。但是挂起的操作已经可以开始 我们检查 m_ullCounter == 0 之后。我们可以使用特殊标志(比如 bool g_bQuit)并设置它。开始前的操作检查此标志,如果为真则不开始。但这无论如何不够

原始代码:

//工作线程

if (!g_bQuit) // (1)
{
    // MessageBoxW(0, 0, L"simulate delay", MB_ICONWARNING);

    InterlockedIncrement(&g_ullCounter); // (4)
    // do operation
    InterlockedDecrement(&g_ullCounter); // (5)
}

// 在这里等待所有操作完成

    g_bQuit = true; // (2)

    // wait on g_ullCounter == 0, how - not important
    while (g_ullCounter) continue; // (3)
  • 等待操作检查 g_bQuit 标志 (1) - 它仍然是假的,所以它 开始
  • 工作线程被交换(使用 MessageBox 模拟)
  • 我们设置 g_bQuit = true; // (2)
  • we check/wait for g_ullCounter == 0, it 0 所以我们退出 (3)
  • 工作线程唤醒(return 来自 MessageBox)并递增 g_ullCounter (4)

这里的问题是 operation 可以使用一些我们已经在 g_ullCounter == 0

之后开始销毁的资源

发生这种情况是因为检查退出标志 (g_Quit) 并在此之后递增计数器 不是原子的 - 可能是它们之间的差距。

为了获得正确的解决方案,我们需要对标志+计数器进行原子访问。这并进行 运行 向下保护。对于 flag+counter 使用单个 LONG 变量(32 位),因为我们可以对其进行原子访问。 31 位用于计数器,1 位用于退出标志。 windows 解决方案使用 0 位作为标志(1 表示退出)和 [1..31] 位作为计数器。我使用 [0..30] 位作为计数器,31 位作为标志(0 表示退出)。寻找

看看 WaitOnAddress() and WakeByAddressSingle()/WakeByAddressAll() 中介绍的功能 Windows 8.

例如:

class CPendingOperationGuard final
{
public: 
    CPendingOperationGuard()
    {
        InterlockedIncrementAcquire(&m_ullCounter);
        Wake­By­Address­All(&m_ullCounter);
    }

    ~CPendingOperationGuard()
    {
        InterlockedDecrementAcquire(&m_ullCounter);
        Wake­By­Address­All(&m_ullCounter);
    }

    static bool WaitForAll( DWORD dwTimeOut )
    {
        ULONGLONG Captured, Now, Deadline = GetTickCount64() + dwTimeOut;
        DWORD TimeRemaining;
        do
        {
            Captured = InterlockedExchangeAdd64((LONG64 volatile *)&m_ullCounter, 0);
            if (Captured == 0) return true;
            Now = GetTickCount64();
            if (Now >= Deadline) return false;
            TimeRemaining = static_cast<DWORD>(Deadline - Now);
        }
        while (WaitOnAddress(&m_ullCounter, &Captured, sizeof(ULONGLONG), TimeRemaining));
        return false;
    }

private:
    static volatile ULONGLONG m_ullCounter;
};

Raymond Chen 写了一系列关于这些功能的博客文章:

WaitOnAddress lets you create a synchronization object out of any data variable, even a byte

Implementing a critical section in terms of WaitOnAddress

Spurious wakes, race conditions, and bogus FIFO claims: A peek behind the curtain of WaitOnAddress

Extending our critical section based on WaitOnAddress to support timeouts

Comparing WaitOnAddress with futexes (futexi? futexen?)

Creating a semaphore from WaitOnAddress

Creating a semaphore with a maximum count from WaitOnAddress

Creating a manual-reset event from WaitOnAddress

Creating an automatic-reset event from WaitOnAddress

A helper template function to wait for WaitOnAddress in a loop