如何在任务延续块中序列化线程执行?
How can I serialize thread execution across a task continuation block?
在下面的示例中,从异步任务中通过事件处理程序调用 HandleChangesAsync。
问题 - 有没有办法确保一次只有一个线程可以执行 HandleChangesAsync create_task + 任务继续块(即使任务继续块调用其他异步函数)?
请注意,我不能只使用同步原语,因为 HandleChangesAsync 可以 return 在异步操作完成之前。
void MyClass::DoStuffAsync()
{
WeakReference weakThis(this);
create_task(DoMoreStuffAsync())
.then([weakThis]())
{
auto strongThis = weakThis.Resolve<HomePromotionTemplateViewModel>();
if (strongThis)
{
strongThis->RegisterForChanges();
strongThis->HandleChangesAsync();
}
});
}
void MyClass::RegisterForChanges()
{
// Attach event handler to &MyClass::OnSomethingChanged
}
void MyClass::OnSomethingChanged()
{
HandleChangesAsync();
}
void MyClass::HandleChangesAsync()
{
WeakReference weakThis(this);
create_task(DoMoreCoolStuffAsync(m_needsProtection))
.then([weakThis]()
{
// do async stuff
// update m_needsProtection
})
.then([weakThis]()
{
// do more async stuff
// update m_needsProtection
});
}
假设您只想忽略重叠的请求(而不是将它们排队),这很容易用原子布尔值完成。可以将以下内容粘贴到新的 XAML 页面,然后检查输出 window。有两个线程,每个线程都在争先恐后地调用 DoStuff()
,但它们中只有一个会同时执行——value_
的值始终正好是 0
或 16
工作完成;从来没有别的。如果多个线程同时执行工作,您可能会得到其他数字。
"do work" 中有一堆愚蠢的代码,但基本代码在 compare_exchange_strong()
call 中。它基本上说 "I expect the value of busy_
to be false
, in which case update busy_
to be true
and return true
(in which case, I'll start doing work). But if the value of busy_
wasn't already false
then return false
(and I won't do any work)"。 注意 if
里面的逻辑否定 !
:-)
我不是内存排序方面的专家,所以如果您 运行 处于紧密循环中,可能有更有效的方法(即传递显式 memory_order
值) ,但它应该是正确的并且应该足以满足正常的用户体验工作:
#include <atomic>
#include <ppltasks.h>
#include <string>
using namespace concurrency;
class Test
{
std::atomic<bool> busy_;
int value_;
task<int> AddNumbersAsync(int x, int y)
{
return task<int>{[x, y]
{
Sleep(20);
return x + y;
}};
}
void CompleteStuff()
{
OutputDebugStringA("** Done with work; value is ");
OutputDebugStringA(std::to_string(value_).c_str());
OutputDebugStringA("\r\n");
busy_ = false;
}
public:
Test()
: busy_{ false }, value_{ 0 }
{}
void DoStuff()
{
// ---
// This is where the magic happens...
// ---
bool expected{ false };
if (!busy_.compare_exchange_strong(expected, true))
{
OutputDebugStringA("Work already in progress; bailing.\r\n");
return;
}
OutputDebugStringA("Doing work...\r\n");
value_ = 2;
try
{
AddNumbersAsync(value_, value_).then([this](int i)
{
value_ = i;
return AddNumbersAsync(value_, value_);
}).then([this](int i)
{
value_ = i;
return AddNumbersAsync(value_, value_);
}).then([this](task<int> i)
{
// ---
// Handle any async exceptions
// ---
try
{
value_ = i.get();
}
catch (...)
{
OutputDebugStringA("Oops, an async exception! Resetting value_\r\n");
value_ = 0;
}
CompleteStuff();
});
}
// ---
// Handle any sync exceptions
// ---
catch (...)
{
OutputDebugStringA("Oops, an exception! Resetting value_\r\n");
value_ = 0;
CompleteStuff();
}
}
};
Test t;
task<void> TestTask1;
task<void> TestTask2;
MainPage::MainPage()
{
InitializeComponent();
TestTask1 = task<void>([]
{
for (int i = 0; i < 100; i++)
{
t.DoStuff();
Sleep(20);
}
});
TestTask1 = task<void>([]
{
for (int i = 0; i < 100; i++)
{
t.DoStuff();
Sleep(30);
}
});
}
请注意,我们需要捕获同步异常(可能由任务链的第一部分引起的异常)和异步异常(由任务之一或其延续引起的异常)。我们通过使用 task<int>
类型的延续来捕获异步异常,然后通过在任务上显式调用 get()
来检查异常。在这两种情况下,我们都重置了 value_
和 busy_
标志。
您应该会看到如下输出;你可以看出多个线程相互竞争,因为有时 OutputDebugString
调用会交错:
Doing work...
Work already in progress; bailing.
Work already in progress; bailing.
Work already in progress; bailing.
Work already in progress; bailing.
** Done with work; value is Work already in progress; bailing.
16
在下面的示例中,从异步任务中通过事件处理程序调用 HandleChangesAsync。
问题 - 有没有办法确保一次只有一个线程可以执行 HandleChangesAsync create_task + 任务继续块(即使任务继续块调用其他异步函数)?
请注意,我不能只使用同步原语,因为 HandleChangesAsync 可以 return 在异步操作完成之前。
void MyClass::DoStuffAsync()
{
WeakReference weakThis(this);
create_task(DoMoreStuffAsync())
.then([weakThis]())
{
auto strongThis = weakThis.Resolve<HomePromotionTemplateViewModel>();
if (strongThis)
{
strongThis->RegisterForChanges();
strongThis->HandleChangesAsync();
}
});
}
void MyClass::RegisterForChanges()
{
// Attach event handler to &MyClass::OnSomethingChanged
}
void MyClass::OnSomethingChanged()
{
HandleChangesAsync();
}
void MyClass::HandleChangesAsync()
{
WeakReference weakThis(this);
create_task(DoMoreCoolStuffAsync(m_needsProtection))
.then([weakThis]()
{
// do async stuff
// update m_needsProtection
})
.then([weakThis]()
{
// do more async stuff
// update m_needsProtection
});
}
假设您只想忽略重叠的请求(而不是将它们排队),这很容易用原子布尔值完成。可以将以下内容粘贴到新的 XAML 页面,然后检查输出 window。有两个线程,每个线程都在争先恐后地调用 DoStuff()
,但它们中只有一个会同时执行——value_
的值始终正好是 0
或 16
工作完成;从来没有别的。如果多个线程同时执行工作,您可能会得到其他数字。
"do work" 中有一堆愚蠢的代码,但基本代码在 compare_exchange_strong()
call 中。它基本上说 "I expect the value of busy_
to be false
, in which case update busy_
to be true
and return true
(in which case, I'll start doing work). But if the value of busy_
wasn't already false
then return false
(and I won't do any work)"。 注意 if
里面的逻辑否定 !
:-)
我不是内存排序方面的专家,所以如果您 运行 处于紧密循环中,可能有更有效的方法(即传递显式 memory_order
值) ,但它应该是正确的并且应该足以满足正常的用户体验工作:
#include <atomic>
#include <ppltasks.h>
#include <string>
using namespace concurrency;
class Test
{
std::atomic<bool> busy_;
int value_;
task<int> AddNumbersAsync(int x, int y)
{
return task<int>{[x, y]
{
Sleep(20);
return x + y;
}};
}
void CompleteStuff()
{
OutputDebugStringA("** Done with work; value is ");
OutputDebugStringA(std::to_string(value_).c_str());
OutputDebugStringA("\r\n");
busy_ = false;
}
public:
Test()
: busy_{ false }, value_{ 0 }
{}
void DoStuff()
{
// ---
// This is where the magic happens...
// ---
bool expected{ false };
if (!busy_.compare_exchange_strong(expected, true))
{
OutputDebugStringA("Work already in progress; bailing.\r\n");
return;
}
OutputDebugStringA("Doing work...\r\n");
value_ = 2;
try
{
AddNumbersAsync(value_, value_).then([this](int i)
{
value_ = i;
return AddNumbersAsync(value_, value_);
}).then([this](int i)
{
value_ = i;
return AddNumbersAsync(value_, value_);
}).then([this](task<int> i)
{
// ---
// Handle any async exceptions
// ---
try
{
value_ = i.get();
}
catch (...)
{
OutputDebugStringA("Oops, an async exception! Resetting value_\r\n");
value_ = 0;
}
CompleteStuff();
});
}
// ---
// Handle any sync exceptions
// ---
catch (...)
{
OutputDebugStringA("Oops, an exception! Resetting value_\r\n");
value_ = 0;
CompleteStuff();
}
}
};
Test t;
task<void> TestTask1;
task<void> TestTask2;
MainPage::MainPage()
{
InitializeComponent();
TestTask1 = task<void>([]
{
for (int i = 0; i < 100; i++)
{
t.DoStuff();
Sleep(20);
}
});
TestTask1 = task<void>([]
{
for (int i = 0; i < 100; i++)
{
t.DoStuff();
Sleep(30);
}
});
}
请注意,我们需要捕获同步异常(可能由任务链的第一部分引起的异常)和异步异常(由任务之一或其延续引起的异常)。我们通过使用 task<int>
类型的延续来捕获异步异常,然后通过在任务上显式调用 get()
来检查异常。在这两种情况下,我们都重置了 value_
和 busy_
标志。
您应该会看到如下输出;你可以看出多个线程相互竞争,因为有时 OutputDebugString
调用会交错:
Doing work...
Work already in progress; bailing.
Work already in progress; bailing.
Work already in progress; bailing.
Work already in progress; bailing.
** Done with work; value is Work already in progress; bailing.
16