Thread Join() 导致 Task.RunSynchronously 未完成
Thread Join() causes Task.RunSynchronously not to finish
调用 _thread.Join()
导致 GetConsumingEnumerable
循环卡在最后一个元素上。为什么会出现这种行为?
public abstract class ActorBase : IDisposable
{
private readonly BlockingCollection<Task> _queue = new BlockingCollection<Task>(new ConcurrentQueue<Task>());
private readonly Thread _thread;
private bool _isDisposed;
protected ActorBase()
{
_thread = new Thread(ProcessMessages);
_thread.Start();
}
protected void QueueTask(Task task)
{
if (_isDisposed)
{
throw new Exception("Actor was disposed, cannot queue task.");
}
_queue.Add(task);
}
private void ProcessMessages()
{
foreach (var task in _queue.GetConsumingEnumerable())
{
task.RunSynchronously();
}
}
public void Dispose()
{
_isDisposed = true;
_queue.CompleteAdding();
_thread.Join();
}
}
public class SampleActor : ActorBase
{
private string GetThreadStatus()
{
Thread.Sleep(500);
return string.Format("Running on thread {0}", Thread.CurrentThread.ManagedThreadId);
}
public async Task<string> GetThreadStatusAsync()
{
var task = new Task<string>(GetThreadStatus);
QueueTask(task);
return await task;
}
}
class Program
{
public static async Task Run()
{
using (var sa = new SampleActor())
{
for (int i = 0; i < 3; i++)
{
Console.WriteLine(await sa.GetThreadStatusAsync());
}
}
}
public static void Main(string[] args)
{
Console.WriteLine("Main thread id {0}", Thread.CurrentThread.ManagedThreadId);
var task = Task.Run(async ()=> { await Run(); });
task.Wait();
}
}
这种方法的背景是我需要确保所有操作都在一个 OS 线程上执行,这将允许应用程序的一部分使用与主线程不同的凭据。
async-await
适用于延续。为了提高效率并减少调度这些延续,通常 运行 在完成前一个任务的同一线程上。
这意味着在您的情况下,您的特殊线程不仅 运行 执行任务,而且 运行 执行这些任务之后的所有延续(for
循环本身) .您可以通过打印线程 ID 看到:
using (var sa = new SampleActor())
{
for (int i = 0; i < 3; i++)
{
Console.WriteLine(await sa.GetThreadStatusAsync());
Console.WriteLine("Continue on thread :" + Thread.CurrentThread.ManagedThreadId);
}
}
当 for
循环完成并且正在处理 SampleActor
时,您从您尝试加入的同一个线程调用 Thread.Join
,这样就会出现死锁。您的情况归结为:
public static void Main()
{
Thread thread = null;
thread = new Thread(() =>
{
Thread.Sleep(100);
thread.Join();
Console.WriteLine("joined");
});
thread.Start();
}
在 .Net 4.6 中,您可以使用 TaskCreationOptions.RunContinuationsAsynchronously
解决此问题,但在当前版本中,您可以指定默认值 TaskScheduler
:
public Task<string> GetThreadStatusAsync()
{
var task = new Task<string>(GetThreadStatus);
QueueTask(task);
return task.ContinueWith(task1 => task1.GetAwaiter().GetResult(), TaskScheduler.Default);
}
可能很想做一个简单的检查,看看您尝试 Join
的线程是否是 Thread.CurrentThread
,但那是错误的。
此外,我认为整个方法 - 调度和 运行宁冷 Task
具有自定义的、不符合 TPL 的调度程序的对象 - 是错误的.您应该使用 TPL 友好的任务调度程序,类似于 Stephen Toub 的 StaTaskScheduler
. Or run a custom SynchronizationContext
for your actor-serving thread (like Toub's AsyncPump
) 并使用 TaskScheduler.FromCurrentSynchronizationContext
和 Task.Factory.StartNew
来使用您的自定义调度程序调度任务(或者使用 Task.Start(TaskScheduler)
如果你必须处理冷任务。
这样,您将可以完全控制任务及其延续 运行 的位置,以及 task inlining。
调用 _thread.Join()
导致 GetConsumingEnumerable
循环卡在最后一个元素上。为什么会出现这种行为?
public abstract class ActorBase : IDisposable
{
private readonly BlockingCollection<Task> _queue = new BlockingCollection<Task>(new ConcurrentQueue<Task>());
private readonly Thread _thread;
private bool _isDisposed;
protected ActorBase()
{
_thread = new Thread(ProcessMessages);
_thread.Start();
}
protected void QueueTask(Task task)
{
if (_isDisposed)
{
throw new Exception("Actor was disposed, cannot queue task.");
}
_queue.Add(task);
}
private void ProcessMessages()
{
foreach (var task in _queue.GetConsumingEnumerable())
{
task.RunSynchronously();
}
}
public void Dispose()
{
_isDisposed = true;
_queue.CompleteAdding();
_thread.Join();
}
}
public class SampleActor : ActorBase
{
private string GetThreadStatus()
{
Thread.Sleep(500);
return string.Format("Running on thread {0}", Thread.CurrentThread.ManagedThreadId);
}
public async Task<string> GetThreadStatusAsync()
{
var task = new Task<string>(GetThreadStatus);
QueueTask(task);
return await task;
}
}
class Program
{
public static async Task Run()
{
using (var sa = new SampleActor())
{
for (int i = 0; i < 3; i++)
{
Console.WriteLine(await sa.GetThreadStatusAsync());
}
}
}
public static void Main(string[] args)
{
Console.WriteLine("Main thread id {0}", Thread.CurrentThread.ManagedThreadId);
var task = Task.Run(async ()=> { await Run(); });
task.Wait();
}
}
这种方法的背景是我需要确保所有操作都在一个 OS 线程上执行,这将允许应用程序的一部分使用与主线程不同的凭据。
async-await
适用于延续。为了提高效率并减少调度这些延续,通常 运行 在完成前一个任务的同一线程上。
这意味着在您的情况下,您的特殊线程不仅 运行 执行任务,而且 运行 执行这些任务之后的所有延续(for
循环本身) .您可以通过打印线程 ID 看到:
using (var sa = new SampleActor())
{
for (int i = 0; i < 3; i++)
{
Console.WriteLine(await sa.GetThreadStatusAsync());
Console.WriteLine("Continue on thread :" + Thread.CurrentThread.ManagedThreadId);
}
}
当 for
循环完成并且正在处理 SampleActor
时,您从您尝试加入的同一个线程调用 Thread.Join
,这样就会出现死锁。您的情况归结为:
public static void Main()
{
Thread thread = null;
thread = new Thread(() =>
{
Thread.Sleep(100);
thread.Join();
Console.WriteLine("joined");
});
thread.Start();
}
在 .Net 4.6 中,您可以使用 TaskCreationOptions.RunContinuationsAsynchronously
解决此问题,但在当前版本中,您可以指定默认值 TaskScheduler
:
public Task<string> GetThreadStatusAsync()
{
var task = new Task<string>(GetThreadStatus);
QueueTask(task);
return task.ContinueWith(task1 => task1.GetAwaiter().GetResult(), TaskScheduler.Default);
}
可能很想做一个简单的检查,看看您尝试 Join
的线程是否是 Thread.CurrentThread
,但那是错误的。
此外,我认为整个方法 - 调度和 运行宁冷 Task
具有自定义的、不符合 TPL 的调度程序的对象 - 是错误的.您应该使用 TPL 友好的任务调度程序,类似于 Stephen Toub 的 StaTaskScheduler
. Or run a custom SynchronizationContext
for your actor-serving thread (like Toub's AsyncPump
) 并使用 TaskScheduler.FromCurrentSynchronizationContext
和 Task.Factory.StartNew
来使用您的自定义调度程序调度任务(或者使用 Task.Start(TaskScheduler)
如果你必须处理冷任务。
这样,您将可以完全控制任务及其延续 运行 的位置,以及 task inlining。