如何在任务延续块中序列化线程执行?

How can I serialize thread execution across a task continuation block?

在下面的示例中,从异步任务中通过事件处理程序调用 HandleChangesAsync。

问题 - 有没有办法确保一次只有一个线程可以执行 HandleChangesAsync create_task + 任务继续块(即使任务继续块调用其他异步函数)?

请注意,我不能只使用同步原语,因为 HandleChangesAsync 可以 return 在异步操作完成之前。

void MyClass::DoStuffAsync()
{    
    WeakReference weakThis(this);
    create_task(DoMoreStuffAsync())
        .then([weakThis]())
    {
        auto strongThis = weakThis.Resolve<HomePromotionTemplateViewModel>();
        if (strongThis)
        {
            strongThis->RegisterForChanges();
            strongThis->HandleChangesAsync();
        }
    });
}

void MyClass::RegisterForChanges()
{    
    // Attach event handler to &MyClass::OnSomethingChanged
}

void MyClass::OnSomethingChanged()
{
    HandleChangesAsync();
}

void MyClass::HandleChangesAsync()
{    
    WeakReference weakThis(this);
    create_task(DoMoreCoolStuffAsync(m_needsProtection))
        .then([weakThis]()
    {
        // do async stuff 
        // update m_needsProtection
    })
        .then([weakThis]()
    {
        // do more async stuff 
        // update m_needsProtection
    });
}

假设您只想忽略重叠的请求(而不是将它们排队),这很容易用原子布尔值完成。可以将以下内容粘贴到新的 XAML 页面,然后检查输出 window。有两个线程,每个线程都在争先恐后地调用 DoStuff(),但它们中只有一个会同时执行——value_ 的值始终正好是 016工作完成;从来没有别的。如果多个线程同时执行工作,您可能会得到其他数字。

"do work" 中有一堆愚蠢的代码,但基本代码在 compare_exchange_strong() call 中。它基本上说 "I expect the value of busy_ to be false, in which case update busy_ to be true and return true (in which case, I'll start doing work). But if the value of busy_ wasn't already false then return false (and I won't do any work)"。 注意 if 里面的逻辑否定 ! :-)

我不是内存排序方面的专家,所以如果您 运行 处于紧密循环中,可能有更有效的方法(即传递显式 memory_order 值) ,但它应该是正确的并且应该足以满足正常的用户体验工作:

#include <atomic>
#include <ppltasks.h>
#include <string>

using namespace concurrency;

class Test
{
  std::atomic<bool> busy_;
  int value_;

  task<int> AddNumbersAsync(int x, int y)
  {
    return task<int>{[x, y]
    {
      Sleep(20);
      return x + y;
    }};
  }

  void CompleteStuff()
  {
    OutputDebugStringA("** Done with work; value is ");
    OutputDebugStringA(std::to_string(value_).c_str());
    OutputDebugStringA("\r\n");
    busy_ = false;
  }

public:
  Test()
    : busy_{ false }, value_{ 0 }
  {}

  void DoStuff()
  {
    // ---
    // This is where the magic happens...
    // ---
    bool expected{ false };
    if (!busy_.compare_exchange_strong(expected, true))
    {
      OutputDebugStringA("Work already in progress; bailing.\r\n");
      return;
    }

    OutputDebugStringA("Doing work...\r\n");
    value_ = 2;

    try
    {
      AddNumbersAsync(value_, value_).then([this](int i)
      {
        value_ = i;
        return AddNumbersAsync(value_, value_);
      }).then([this](int i)
      {
        value_ = i;
        return AddNumbersAsync(value_, value_);
      }).then([this](task<int> i)
      {
        // ---
        // Handle any async exceptions
        // ---
        try
        {
          value_ = i.get();
        }
        catch (...)
        {
          OutputDebugStringA("Oops, an async exception! Resetting value_\r\n");
          value_ = 0;
        }
        CompleteStuff();
      });
    }
    // ---
    // Handle any sync exceptions
    // ---
    catch (...)
    {
      OutputDebugStringA("Oops, an exception! Resetting value_\r\n");
      value_ = 0;
      CompleteStuff();
    }
  }
};

Test t;

task<void> TestTask1;
task<void> TestTask2;

MainPage::MainPage()
{
  InitializeComponent();
  TestTask1 = task<void>([]
  {
    for (int i = 0; i < 100; i++)
    {
      t.DoStuff();
      Sleep(20);
    }
  });

  TestTask1 = task<void>([]
  {
    for (int i = 0; i < 100; i++)
    {
      t.DoStuff();
      Sleep(30);
    }
  });
}

请注意,我们需要捕获同步异常(可能由任务链的第一部分引起的异常)和异步异常(由任务之一或其延续引起的异常)。我们通过使用 task<int> 类型的延续来捕获异步异常,然后通过在任务上显式调用 get() 来检查异常。在这两种情况下,我们都重置了 value_busy_ 标志。

您应该会看到如下输出;你可以看出多个线程相互竞争,因为有时 OutputDebugString 调用会交错:

Doing work...
Work already in progress; bailing.
Work already in progress; bailing.
Work already in progress; bailing.
Work already in progress; bailing.
** Done with work; value is Work already in progress; bailing.
16