如何让它 运行 同步?

How to make it run sync?

你好,我想同步两个线程,一个递增一个变量,另一个递减它。 我想要的结果如下所示:

Thread #0 j = 1

Thread #1 j = 0

Thread #0 j = 1

Thread #1 j = 0

等等......但我的代码有时会这样工作,在某些情况下它会打印出非常奇怪的值。我想我在某处有一些未定义的行为,但我无法弄清楚到底发生了什么。

我的代码包含在一个 HANDLE ghMutex 中,其中包含我的互斥锁的处理程序:

我的主要功能:

int main(void)
{
    HANDLE aThread[THREADCOUNT];

    ghMutex = CreateMutex(NULL, FALSE, NULL);             

    aThread[0] = (HANDLE)_beginthreadex(NULL, 0, &inc, NULL, CREATE_SUSPENDED, 0);
    aThread[1] = (HANDLE)_beginthreadex(NULL, 0, &dec, NULL, CREATE_SUSPENDED, 0);

    ResumeThread(aThread[0]);
    ResumeThread(aThread[1]);

    WaitForMultipleObjects(THREADCOUNT, aThread, TRUE, INFINITE);

    printf("j = %d\n", j);

    for (int i = 0; i < THREADCOUNT; i++)
        CloseHandle(aThread[i]);

    CloseHandle(ghMutex);

    return 0;
}

公司功能:

unsigned int __stdcall inc(LPVOID)
{
    for (volatile int i = 0; i < MAX; ++i)
    {
        WaitForSingleObject(
            ghMutex,    // handle to mutex
            INFINITE);  // no time-out interval

            j++;
            printf("Thread %d j = %d\n", GetCurrentThreadId(), j);
            ReleaseMutex(ghMutex);

    }
    _endthread();

    return TRUE;
}

12 月函数:

unsigned int __stdcall dec(void*)
{
    for (volatile int i = 0; i < MAX; ++i)
    {
        WaitForSingleObject(
            ghMutex,    // handle to mutex
            INFINITE);  // no time-out interval

        j--;
        printf("Thread %d j = %d\n", GetCurrentThreadId(), j);
        ReleaseMutex(ghMutex);
    }
    _endthread();

    return TRUE;
}

我需要 std c++98 中的 win api 解决方案。

互斥锁不是同步两个线程的正确工具,它是用来保护资源的。您确实有一个受互斥体保护的资源 j,但是未定义线程获得锁的顺序,因此您可能会遇到 dec 在 [=12= 之前被调用多次的情况]有机会运行.

如果你想同步线程的顺序,你将不得不使用另一个同步原语,例如信号量。例如,您可以在 inc 中递增信号量并在 dec 中递减信号量。这将是经典的生产者 - 消费者关系,当信号量达到其最大值时生产者将停止,而消费者将等待项目消费。

抱歉,我没有提供 WinAPI C++98 解决方案,因为这很愚蠢,但我希望我为您指明了正确的方向。

windows互斥对象保证独占所有权,但不关心所有权顺序。这样同一个线程可以连续捕获几次,而其他线程将等待。

对于您的任务,您需要向另一个线程发出信号,当您的任务完成时,然后等待来自另一个线程的信号。例如,对于此任务可以使用事件对。线程 (i) 信号事件 (1-i) 并等待事件 (i)。用于优化而不是 2 个调用 -

SetEvent(e[1-i]); WaitForSingleObject(e[i], INFINITE);

我们可以使用单调用SignalObjectAndWait

SignalObjectAndWait(e[1-i], e[i], INFINITE, FALSE)

当然,循环的开始和结束需要特别小心。对于 inc

    HANDLE hObjectToSignal  = _hEvent[1], hObjectToWaitOn  = _hEvent[0];

    for (;;)
    {
        _shared_value++;

        if (!--n)
        {
            SetEvent(hObjectToSignal);
            break;
        }

        SignalObjectAndWait(hObjectToSignal, hObjectToWaitOn, INFINITE, FALSE);
    }

dec

    HANDLE hObjectToSignal  = _hEvent[0], hObjectToWaitOn  = _hEvent[1];

    WaitForSingleObject(hObjectToWaitOn, INFINITE);
    for (;;)
    {
        --_shared_value;

        if (!--n)
        {
            break;
        }

        SignalObjectAndWait(hObjectToSignal, hObjectToWaitOn, INFINITE, FALSE);
    }

if 编写完整测试,并进行错误检查

struct Task 
{
    HANDLE _hEvent[4];
    ULONG _n;
    LONG _iTasks;
    LONG _shared_value;

    Task()
    {
        RtlZeroMemory(this, sizeof(*this));
    }

    ~Task()
    {
        ULONG n = RTL_NUMBER_OF(_hEvent);
        do 
        {
            if (HANDLE hEvent = _hEvent[--n]) CloseHandle(hEvent);
        } while (n);
    }

    ULONG WaitTaskEnd()
    {
        return WaitForSingleObject(_hEvent[2], INFINITE);
    }

    ULONG WaitTaskReady()
    {
        return WaitForSingleObject(_hEvent[3], INFINITE);
    }

    void SetTaskReady()
    {
        SetEvent(_hEvent[3]);
    }

    void End()
    {
        if (!InterlockedDecrement(&_iTasks)) SetEvent(_hEvent[2]);
    }

    void Begin()
    {
        InterlockedIncrementNoFence(&_iTasks);
    }

    static ULONG WINAPI IncThread(PVOID p)
    {
        return reinterpret_cast<Task*>(p)->Inc(), 0;
    }

    void Inc()
    {
        if (WaitTaskReady() == WAIT_OBJECT_0)
        {
            if (ULONG n = _n)
            {
                HANDLE hObjectToSignal  = _hEvent[1], hObjectToWaitOn  = _hEvent[0];

                for (;;)
                {
                    if (_shared_value) __debugbreak();

                    if (n < 17) DbgPrint("Inc(%u)\n", n);

                    _shared_value++;

                    if (!--n)
                    {
                        SetEvent(hObjectToSignal);
                        break;
                    }

                    if (SignalObjectAndWait(hObjectToSignal, hObjectToWaitOn, INFINITE, FALSE) != WAIT_OBJECT_0)
                    {
                        break;
                    }
                }
            }
        }

        End();
    }

    static ULONG WINAPI DecThread(PVOID p)
    {
        return reinterpret_cast<Task*>(p)->Dec(), 0;
    }

    void Dec()
    {
        if (WaitTaskReady() == WAIT_OBJECT_0)
        {
            if (ULONG n = _n)
            {
                HANDLE hObjectToSignal  = _hEvent[0], hObjectToWaitOn  = _hEvent[1];

                if (WaitForSingleObject(hObjectToWaitOn, INFINITE) == WAIT_OBJECT_0)
                {
                    for (;;)
                    {
                        --_shared_value;

                        if (_shared_value) __debugbreak();

                        if (n < 17) DbgPrint("Dec(%u)\n", n);

                        if (!--n)
                        {
                            break;
                        }

                        if (SignalObjectAndWait(hObjectToSignal, hObjectToWaitOn, INFINITE, FALSE) != WAIT_OBJECT_0)
                        {
                            break;
                        }
                    }
                }
            }
        }

        End();
    }

    ULONG Create()
    {
        ULONG n = RTL_NUMBER_OF(_hEvent);
        do 
        {
            if (HANDLE hEvent = CreateEventW(0, n > 2, 0, 0)) _hEvent[--n] = hEvent; 
            else return GetLastError();
        } while (n);

        return NOERROR;
    }

    ULONG Start()
    {
        static PTHREAD_START_ROUTINE aa[] = { IncThread, DecThread };

        ULONG n = RTL_NUMBER_OF(aa);

        do 
        {
            Begin();
            if (HANDLE hThread = CreateThread(0, 0, aa[--n], this, 0, 0))
            {
                CloseHandle(hThread);
            }
            else
            {
                n = GetLastError();
                End();
                return n;
            }
        } while (n);

        return NOERROR;
    }

    ULONG Start(ULONG n)
    {
        _iTasks = 1;

        ULONG dwError = Start();

        _n = dwError ? 0 : n;

        SetTaskReady();

        End();

        return dwError;
    }
};

void TaskTest(ULONG n)
{
    Task task;
    if (task.Create() == NOERROR)
    {
        task.Start(n);
        task.WaitTaskEnd();
    }
}

请注意,没有任何意义将局部变量(只能从单线程访问,不会被任何中断等访问)声明为 volatile

还有我们写代码的时候,比如:

// thread #1
write_shared_data();
SetEvent(hEvent);

// thread #2
WaitForSingleObject(hEvent, INFINITE);
read_shared_data();

在 SetEvent(hEvent) 里面;是原子写入具有释放语义的事件状态(当然确实更强)和内部等待事件功能 - 原子读取它的状态不仅仅是获取语义。结果,线程 #1 在 SetEvent 之前写入内存的所有内容 - 在 Wait for 事件之后对线程 #2 可见(如果等待作为线程 #1 调用 Set 的结果完成)