backgroundworker 中异步等待中的调解器死锁 - 如何检测线程调用自身
Mediator deadlock on async await within background worker - how to detect thread calling itself
我有一个调解器,我最近需要它在后台线程上一次同步一个消息调度,但它正在锁定,如下所示。
我 post 一个队列命令和 return 来自 TaskCompletionSource 的任务:
public Task<object> Send(object command, CancellationToken cancellationToken)
{
var item = new CommandItem() { Command = request, Tcs = new TaskCompletionSource<object>(), Ct = cancellationToken };
this.queue.Writer.WriteAsync(item); // just write and immediatly return the tcs
return item.Tcs.Task;
}
然后从后台工作人员那里获取它,并创建处理程序:
var item = await this.queue.Reader.ReadAsync(cancellationToken);
// work out command type snipped
var command = item.Command as LockMeGoodCommand;
var handler = new LockMeGoodCommandHandler();
var result = await handler.Handle(command, item.Ct);
item.Tcs.SetResult(result);
然后处理,当命令处理程序发送到命令处理程序内时,以下锁定(当使用后台线程时,但在线程内是可以的):
public async Task<int> Handle(LockMeGoodCommand command, CancellationToken cancellationToken)
{
Console.WriteLine(command.GetType().Name);
// this would get the result but will lock forever when using background worker bus implementation
var otherResult = await this.commandBus.Send(new BoringCommand(), cancellationToken);
// perform some action based on the result - but we never get here
Console.WriteLine("otherResult is " + otherResult);
return 3;
}
** 问题和可能的解决方法 **
我相信我们可以避免死锁,方法是检测后台线程是否从其线程内 post 自身(通过命令处理程序,然后调用 Send() 到 post 一个新命令),如果是这样,它不应该使用任何线程机制(post 到命令队列或 TaskCompletionSource),而应该直接处理任务。
我已尝试检测该线程,但它不起作用,因此我在 var otherResult = await this.commandBus.Send(new BoringCommand(), cancellationToken, true)
和 上方的处理程序中将手动标志 isSameThread 设置为 true 我可以确认它有效并且出现死锁避免.
此修复有任何注意事项吗?如何检测后台线程是否正在请求发送命令(线程如何检测自身)以及如何完成下面的代码(来自 DispatchOnBackgroundThread.Send()
以包含此自调用检测(所以我可以做去掉 isSameThread 标志)?
这似乎更复杂,因为每个 await 都会给出不同的线程 ID。
// in thread start we set the thread id of the background thread
this.workerThreadId = System.Threading.Thread.CurrentThread.ManagedThreadId;
public Task<object> Send(object command, CancellationToken cancellationToken, bool isSameThread = false)
{
Console.WriteLine($"this.workerThreadId: {this.workerThreadId}, Thread.CurrentThread.ManagedThreadId: {Thread.CurrentThread.ManagedThreadId}");
// below doesnt work gives different numbers so i use flag instead
// this.workerThreadId == Thread.CurrentThread.ManagedThreadId
if (isSameThread == true)
{
if (command is BoringCommand boringCommand)
{
var handler = new BoringCommandHandler();
return handler.Handle(boringCommand, cancellationToken).ContinueWith(t => (object)t);
}
else if (command is LockMeGoodCommand lockMeGoodCommand)
{
var handler = new LockMeGoodCommandHandler(this);
return handler.Handle(lockMeGoodCommand, cancellationToken).ContinueWith(t => (object)t);
}
else
throw new Exception("unknown");
}
else
{
var item = new CommandItem() { Command = command, Tcs = new TaskCompletionSource<object>(), Ct = cancellationToken };
this.queue.Writer.WriteAsync(item); // just write and immediatly return the cts
return item.Tcs.Task;
}
}
** 代码演示问题 **
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace TestDeadlock
{
class BoringCommand { }
class LockMeGoodCommand { }
class BoringCommandHandler
{
public Task<int> Handle(BoringCommand command, CancellationToken cancellationToken)
{
Console.WriteLine(command.GetType().Name);
return Task.FromResult(1);
}
}
class LockMeGoodCommandHandler
{
private readonly DispatchOnBackgroundThread commandBus;
public LockMeGoodCommandHandler(DispatchOnBackgroundThread commandBus) => this.commandBus = commandBus;
public async Task<int> Handle(LockMeGoodCommand command, CancellationToken cancellationToken)
{
Console.WriteLine(command.GetType().Name);
// this locks forever
var otherResult = await this.commandBus.Send(new BoringCommand(), cancellationToken);
Console.WriteLine("otherResult is " + otherResult);
return 3;
}
}
public class DispatchOnBackgroundThread
{
private readonly Channel<CommandItem> queue = Channel.CreateUnbounded<CommandItem>();
private Task worker = null;
class CommandItem
{
public object Command { get; set; }
public CancellationToken Ct { get; set; }
public TaskCompletionSource<object> Tcs { get; set; }
}
public Task<object> Send(object command, CancellationToken cancellationToken)
{
var item = new CommandItem()
{ Command = command, Tcs = new TaskCompletionSource<object>(), Ct = cancellationToken };
this.queue.Writer.WriteAsync(item); // just write and immediatly return the tcs
return item.Tcs.Task;
}
public void Start(CancellationToken cancellationToken)
{
this.worker = Task.Factory.StartNew(async () =>
{
try
{
while (cancellationToken.IsCancellationRequested == false)
{
var item = await this.queue.Reader.ReadAsync(cancellationToken);
// simplified DI container magic to static invocation
if (item.Command is BoringCommand boringCommand)
{
var handler = new BoringCommandHandler();
var result = await handler.Handle(boringCommand, item.Ct);
item.Tcs.SetResult(result);
}
if (item.Command is LockMeGoodCommand lockMeGoodCommand)
{
var handler = new LockMeGoodCommandHandler(this);
var result = await handler.Handle(lockMeGoodCommand, item.Ct);
item.Tcs.SetResult(result);
}
}
}
catch (TaskCanceledException) { }
},
TaskCreationOptions.LongRunning)
.Unwrap();
}
public async Task StopAsync()
{
this.queue.Writer.Complete();
await this.worker;
}
}
class Program
{
static async Task Main(string[] args)
{
var cts = new CancellationTokenSource();
var threadStrategy = new DispatchOnBackgroundThread();
threadStrategy.Start(cts.Token);
var result1 = await threadStrategy.Send(new BoringCommand(), cts.Token);
var result2 = await threadStrategy.Send(new LockMeGoodCommand(), cts.Token);
cts.Cancel();
await threadStrategy.StopAsync();
}
}
}
** 无需锁定即可工作的简单非线程中介实现 **
public class DispatchInCallingThread
{
public async Task<object> Send(object request, CancellationToken cancellationToken)
{
// simplified DI container magic to static invocation
if (request is BoringCommand boringCommand)
{
var handler = new BoringCommandHandler();
return await handler.Handle(boringCommand, cancellationToken);
}
else if (request is LockMeGoodCommand lockMeGoodCommand)
{
var handler = new LockMeGoodCommandHandler(this);
return await handler.Handle(lockMeGoodCommand, cancellationToken);
}
else
throw new Exception("unknown");
}
}
死锁的原因很简单:
- 有一个代码循环(不是特定线程;见下文)负责处理队列。当它处理每个命令时,它
await
是该命令的处理程序。
- 有一个命令处理程序
await
另一个命令要处理。但是,这行不通,因为不会处理进一步的命令;在这个命令完成之前,代码循环不会使下一个命令出队。
换句话说,如果一次只能执行一个命令,则一个命令执行另一个命令在逻辑上是不可能的。
有几种可能的方法可以解决这个问题。我不推荐"re-entrant"方法;重入是许多微妙的逻辑错误的原因。我推荐的方法是以下之一:
- 更改
Send
语义,使其成为 "queue" 语义。这意味着不可能获得命令结果;结果必须通过一些中介作为消息发送。
- 让代码循环而不是
await
命令处理程序,允许它循环返回并获取下一个命令。这意味着它不再 "synchronize one at a time"。
- 将 "synchronize one at a time" 重新定义为 "one at a time but if it's
await
ing then it doesn't count as one"。在这种情况下,您可能可以使用 ConcurrentExclusiveSchedulerPair
或 Nito.AsyncEx.AsyncContext
之类的方法来 运行 该方法一次一个块。
旁注:LongRunning
并没有按照您的想法行事。 StartNew
is not async
-aware,因此 LongRunning
标志仅适用于第一个 await
之前的代码;之后,该 lambda 中的代码将在任意线程池线程上 运行(未设置 LongRunning
)。将 StartNew
替换为 Task.Run
将使代码更清晰。
感谢 Stephen 的回答和 Peter 的评论,说起来确实非常清楚,谢谢,
There is one code loop (not a specific thread; see below) that is
responsible for processing the queue. As it processes each command, it
awaits that command's handler.
There is a command handler that awaits another command to be handled.
However, this cannot work because no further commands will be
processed; the code loop will not dequeue the next command until this
one completes.
考虑到以上几点,我找到了一种无需任何线程黑客(检测 stack/re-entrance 深度等)或调度程序即可处理的方法。
在下面的示例中,我 "inject" 进入处理程序而不是循环调用 class,而是一种不同类型的命令处理程序调度程序,它不进行任何排队,而是直接在线程内处理.
下面是在线程循环中调用的,没有相互依赖:
public class DispatchInCallingThread: ICommandBus
{
public async Task<object> Send(object request, CancellationToken cancellationToken)
{
// simplified DI container magic to static invocation
if (request is BoringCommand boringCommand)
{
var handler = new BoringCommandHandler();
return await handler.Handle(boringCommand, cancellationToken);
}
else if (request is LockMeGoodCommand lockMeGoodCommand)
{
var handler = new LockMeGoodCommandHandler(this);
return await handler.Handle(lockMeGoodCommand, cancellationToken);
}
else
throw new Exception("cough furball");
}
public void Start(CancellationToken cancellationToken) { }
public Task StopAsync() { return Task.CompletedTask; }
}
在后台线程中,这是对实例化命令处理程序的注入:
else if (item.Command is LockMeGoodCommand lockMeGoodCommand)
{
var handler = new LockMeGoodCommandHandler(this.dispatchInCallingThread);
var result = await handler.Handle(lockMeGoodCommand, item.Ct);
item.Tcs.SetResult(result);
}
现在代码永远运行(将需要为正在设置的取消令牌源实施适当的关闭逻辑):
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace TestDeadlock
{
class BoringCommand { }
class LockMeGoodCommand { }
class BoringCommandHandler
{
public Task<int> Handle(BoringCommand command, CancellationToken cancellationToken)
{
Console.WriteLine(command.GetType().Name);
return Task.FromResult(1);
}
}
class LockMeGoodCommandHandler
{
private readonly ICommandBus commandBus;
public LockMeGoodCommandHandler(ICommandBus commandBus) => this.commandBus = commandBus;
public async Task<int> Handle(LockMeGoodCommand command, CancellationToken cancellationToken)
{
Console.WriteLine(command.GetType().Name);
var otherResult = await this.commandBus.Send(new BoringCommand(), cancellationToken);
var otherResult2 = await this.commandBus.Send(new BoringCommand(), cancellationToken);
return 3;
}
}
public interface ICommandBus
{
Task<object> Send(object request, CancellationToken cancellationToken);
void Start(CancellationToken cancellationToken);
Task StopAsync();
}
public class DispatchOnBackgroundThread : ICommandBus
{
private readonly Channel<CommandItem> queue = Channel.CreateUnbounded<CommandItem>();
private Task worker = null;
private readonly DispatchInCallingThread dispatchInCallingThread = new DispatchInCallingThread();
class CommandItem
{
public object Command { get; set; }
public CancellationToken Ct { get; set; }
public TaskCompletionSource<object> Tcs { get; set; }
}
public Task<object> Send(object command, CancellationToken cancellationToken)
{
var item = new CommandItem() { Command = command, Tcs = new TaskCompletionSource<object>(), Ct = cancellationToken };
this.queue.Writer.WriteAsync(item, cancellationToken); // just write and immediatly return the cts
return item.Tcs.Task;
}
public void Start(CancellationToken cancellationToken)
{
var scheduler = new ConcurrentExclusiveSchedulerPair();
this.worker = Task.Factory.StartNew(async () =>
{
CommandItem item = null;
try
{
while (cancellationToken.IsCancellationRequested == false)
{
item = await this.queue.Reader.ReadAsync(cancellationToken);
// simplified DI container magic to static invocation
if (item.Command is BoringCommand boringCommand)
{
var handler = new BoringCommandHandler();
var result = handler.Handle(boringCommand, item.Ct);
item.Tcs.SetResult(result);
}
else if (item.Command is LockMeGoodCommand lockMeGoodCommand)
{
var handler = new LockMeGoodCommandHandler(this.dispatchInCallingThread);
var result = await handler.Handle(lockMeGoodCommand, item.Ct);
item.Tcs.SetResult(result);
}
else
throw new Exception("unknown");
}
}
catch (TaskCanceledException)
{
if (item != null)
item.Tcs.SetCanceled();
}
Console.WriteLine("exit background thread");
})
.Unwrap();
}
public async Task StopAsync()
{
this.queue.Writer.Complete();
await this.worker;
}
}
public class DispatchInCallingThread: ICommandBus
{
public async Task<object> Send(object request, CancellationToken cancellationToken)
{
// simplified DI container magic to static invocation
if (request is BoringCommand boringCommand)
{
var handler = new BoringCommandHandler();
return await handler.Handle(boringCommand, cancellationToken);
}
else if (request is LockMeGoodCommand lockMeGoodCommand)
{
var handler = new LockMeGoodCommandHandler(this);
return await handler.Handle(lockMeGoodCommand, cancellationToken);
}
else
throw new Exception("unknown");
}
public void Start(CancellationToken cancellationToken) { }
public Task StopAsync() { return Task.CompletedTask; }
}
class Program
{
static async Task Main(string[] args)
{
await TestDispatchOnBackgroundThread();
}
static async Task TestDispatchOnBackgroundThread()
{
var cts = new CancellationTokenSource();
Console.CancelKeyPress += delegate {
Console.WriteLine("setting cts.Cancel()");
cts.Cancel();
};
var threadStrategy = new DispatchOnBackgroundThread();
threadStrategy.Start(cts.Token);
while (cts.IsCancellationRequested == false)
{
Console.WriteLine("***************** sending new batch ****************");
var result1 = await threadStrategy.Send(new BoringCommand(), cts.Token);
var result3 = await threadStrategy.Send(new LockMeGoodCommand(), cts.Token);
Thread.Sleep(1000);
}
await threadStrategy.StopAsync();
}
}
}
有关更多信息,依赖注入的实际实现在此处,它能够在工作线程内动态切换到线程内调度
我有一个调解器,我最近需要它在后台线程上一次同步一个消息调度,但它正在锁定,如下所示。
我 post 一个队列命令和 return 来自 TaskCompletionSource 的任务:
public Task<object> Send(object command, CancellationToken cancellationToken)
{
var item = new CommandItem() { Command = request, Tcs = new TaskCompletionSource<object>(), Ct = cancellationToken };
this.queue.Writer.WriteAsync(item); // just write and immediatly return the tcs
return item.Tcs.Task;
}
然后从后台工作人员那里获取它,并创建处理程序:
var item = await this.queue.Reader.ReadAsync(cancellationToken);
// work out command type snipped
var command = item.Command as LockMeGoodCommand;
var handler = new LockMeGoodCommandHandler();
var result = await handler.Handle(command, item.Ct);
item.Tcs.SetResult(result);
然后处理,当命令处理程序发送到命令处理程序内时,以下锁定(当使用后台线程时,但在线程内是可以的):
public async Task<int> Handle(LockMeGoodCommand command, CancellationToken cancellationToken)
{
Console.WriteLine(command.GetType().Name);
// this would get the result but will lock forever when using background worker bus implementation
var otherResult = await this.commandBus.Send(new BoringCommand(), cancellationToken);
// perform some action based on the result - but we never get here
Console.WriteLine("otherResult is " + otherResult);
return 3;
}
** 问题和可能的解决方法 **
我相信我们可以避免死锁,方法是检测后台线程是否从其线程内 post 自身(通过命令处理程序,然后调用 Send() 到 post 一个新命令),如果是这样,它不应该使用任何线程机制(post 到命令队列或 TaskCompletionSource),而应该直接处理任务。
我已尝试检测该线程,但它不起作用,因此我在 var otherResult = await this.commandBus.Send(new BoringCommand(), cancellationToken, true)
和 上方的处理程序中将手动标志 isSameThread 设置为 true 我可以确认它有效并且出现死锁避免.
此修复有任何注意事项吗?如何检测后台线程是否正在请求发送命令(线程如何检测自身)以及如何完成下面的代码(来自 DispatchOnBackgroundThread.Send()
以包含此自调用检测(所以我可以做去掉 isSameThread 标志)?
这似乎更复杂,因为每个 await 都会给出不同的线程 ID。
// in thread start we set the thread id of the background thread
this.workerThreadId = System.Threading.Thread.CurrentThread.ManagedThreadId;
public Task<object> Send(object command, CancellationToken cancellationToken, bool isSameThread = false)
{
Console.WriteLine($"this.workerThreadId: {this.workerThreadId}, Thread.CurrentThread.ManagedThreadId: {Thread.CurrentThread.ManagedThreadId}");
// below doesnt work gives different numbers so i use flag instead
// this.workerThreadId == Thread.CurrentThread.ManagedThreadId
if (isSameThread == true)
{
if (command is BoringCommand boringCommand)
{
var handler = new BoringCommandHandler();
return handler.Handle(boringCommand, cancellationToken).ContinueWith(t => (object)t);
}
else if (command is LockMeGoodCommand lockMeGoodCommand)
{
var handler = new LockMeGoodCommandHandler(this);
return handler.Handle(lockMeGoodCommand, cancellationToken).ContinueWith(t => (object)t);
}
else
throw new Exception("unknown");
}
else
{
var item = new CommandItem() { Command = command, Tcs = new TaskCompletionSource<object>(), Ct = cancellationToken };
this.queue.Writer.WriteAsync(item); // just write and immediatly return the cts
return item.Tcs.Task;
}
}
** 代码演示问题 **
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace TestDeadlock
{
class BoringCommand { }
class LockMeGoodCommand { }
class BoringCommandHandler
{
public Task<int> Handle(BoringCommand command, CancellationToken cancellationToken)
{
Console.WriteLine(command.GetType().Name);
return Task.FromResult(1);
}
}
class LockMeGoodCommandHandler
{
private readonly DispatchOnBackgroundThread commandBus;
public LockMeGoodCommandHandler(DispatchOnBackgroundThread commandBus) => this.commandBus = commandBus;
public async Task<int> Handle(LockMeGoodCommand command, CancellationToken cancellationToken)
{
Console.WriteLine(command.GetType().Name);
// this locks forever
var otherResult = await this.commandBus.Send(new BoringCommand(), cancellationToken);
Console.WriteLine("otherResult is " + otherResult);
return 3;
}
}
public class DispatchOnBackgroundThread
{
private readonly Channel<CommandItem> queue = Channel.CreateUnbounded<CommandItem>();
private Task worker = null;
class CommandItem
{
public object Command { get; set; }
public CancellationToken Ct { get; set; }
public TaskCompletionSource<object> Tcs { get; set; }
}
public Task<object> Send(object command, CancellationToken cancellationToken)
{
var item = new CommandItem()
{ Command = command, Tcs = new TaskCompletionSource<object>(), Ct = cancellationToken };
this.queue.Writer.WriteAsync(item); // just write and immediatly return the tcs
return item.Tcs.Task;
}
public void Start(CancellationToken cancellationToken)
{
this.worker = Task.Factory.StartNew(async () =>
{
try
{
while (cancellationToken.IsCancellationRequested == false)
{
var item = await this.queue.Reader.ReadAsync(cancellationToken);
// simplified DI container magic to static invocation
if (item.Command is BoringCommand boringCommand)
{
var handler = new BoringCommandHandler();
var result = await handler.Handle(boringCommand, item.Ct);
item.Tcs.SetResult(result);
}
if (item.Command is LockMeGoodCommand lockMeGoodCommand)
{
var handler = new LockMeGoodCommandHandler(this);
var result = await handler.Handle(lockMeGoodCommand, item.Ct);
item.Tcs.SetResult(result);
}
}
}
catch (TaskCanceledException) { }
},
TaskCreationOptions.LongRunning)
.Unwrap();
}
public async Task StopAsync()
{
this.queue.Writer.Complete();
await this.worker;
}
}
class Program
{
static async Task Main(string[] args)
{
var cts = new CancellationTokenSource();
var threadStrategy = new DispatchOnBackgroundThread();
threadStrategy.Start(cts.Token);
var result1 = await threadStrategy.Send(new BoringCommand(), cts.Token);
var result2 = await threadStrategy.Send(new LockMeGoodCommand(), cts.Token);
cts.Cancel();
await threadStrategy.StopAsync();
}
}
}
** 无需锁定即可工作的简单非线程中介实现 **
public class DispatchInCallingThread
{
public async Task<object> Send(object request, CancellationToken cancellationToken)
{
// simplified DI container magic to static invocation
if (request is BoringCommand boringCommand)
{
var handler = new BoringCommandHandler();
return await handler.Handle(boringCommand, cancellationToken);
}
else if (request is LockMeGoodCommand lockMeGoodCommand)
{
var handler = new LockMeGoodCommandHandler(this);
return await handler.Handle(lockMeGoodCommand, cancellationToken);
}
else
throw new Exception("unknown");
}
}
死锁的原因很简单:
- 有一个代码循环(不是特定线程;见下文)负责处理队列。当它处理每个命令时,它
await
是该命令的处理程序。 - 有一个命令处理程序
await
另一个命令要处理。但是,这行不通,因为不会处理进一步的命令;在这个命令完成之前,代码循环不会使下一个命令出队。
换句话说,如果一次只能执行一个命令,则一个命令执行另一个命令在逻辑上是不可能的。
有几种可能的方法可以解决这个问题。我不推荐"re-entrant"方法;重入是许多微妙的逻辑错误的原因。我推荐的方法是以下之一:
- 更改
Send
语义,使其成为 "queue" 语义。这意味着不可能获得命令结果;结果必须通过一些中介作为消息发送。 - 让代码循环而不是
await
命令处理程序,允许它循环返回并获取下一个命令。这意味着它不再 "synchronize one at a time"。 - 将 "synchronize one at a time" 重新定义为 "one at a time but if it's
await
ing then it doesn't count as one"。在这种情况下,您可能可以使用ConcurrentExclusiveSchedulerPair
或Nito.AsyncEx.AsyncContext
之类的方法来 运行 该方法一次一个块。
旁注:LongRunning
并没有按照您的想法行事。 StartNew
is not async
-aware,因此 LongRunning
标志仅适用于第一个 await
之前的代码;之后,该 lambda 中的代码将在任意线程池线程上 运行(未设置 LongRunning
)。将 StartNew
替换为 Task.Run
将使代码更清晰。
感谢 Stephen 的回答和 Peter 的评论,说起来确实非常清楚,谢谢,
There is one code loop (not a specific thread; see below) that is responsible for processing the queue. As it processes each command, it awaits that command's handler.
There is a command handler that awaits another command to be handled. However, this cannot work because no further commands will be processed; the code loop will not dequeue the next command until this one completes.
考虑到以上几点,我找到了一种无需任何线程黑客(检测 stack/re-entrance 深度等)或调度程序即可处理的方法。
在下面的示例中,我 "inject" 进入处理程序而不是循环调用 class,而是一种不同类型的命令处理程序调度程序,它不进行任何排队,而是直接在线程内处理.
下面是在线程循环中调用的,没有相互依赖:
public class DispatchInCallingThread: ICommandBus
{
public async Task<object> Send(object request, CancellationToken cancellationToken)
{
// simplified DI container magic to static invocation
if (request is BoringCommand boringCommand)
{
var handler = new BoringCommandHandler();
return await handler.Handle(boringCommand, cancellationToken);
}
else if (request is LockMeGoodCommand lockMeGoodCommand)
{
var handler = new LockMeGoodCommandHandler(this);
return await handler.Handle(lockMeGoodCommand, cancellationToken);
}
else
throw new Exception("cough furball");
}
public void Start(CancellationToken cancellationToken) { }
public Task StopAsync() { return Task.CompletedTask; }
}
在后台线程中,这是对实例化命令处理程序的注入:
else if (item.Command is LockMeGoodCommand lockMeGoodCommand)
{
var handler = new LockMeGoodCommandHandler(this.dispatchInCallingThread);
var result = await handler.Handle(lockMeGoodCommand, item.Ct);
item.Tcs.SetResult(result);
}
现在代码永远运行(将需要为正在设置的取消令牌源实施适当的关闭逻辑):
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace TestDeadlock
{
class BoringCommand { }
class LockMeGoodCommand { }
class BoringCommandHandler
{
public Task<int> Handle(BoringCommand command, CancellationToken cancellationToken)
{
Console.WriteLine(command.GetType().Name);
return Task.FromResult(1);
}
}
class LockMeGoodCommandHandler
{
private readonly ICommandBus commandBus;
public LockMeGoodCommandHandler(ICommandBus commandBus) => this.commandBus = commandBus;
public async Task<int> Handle(LockMeGoodCommand command, CancellationToken cancellationToken)
{
Console.WriteLine(command.GetType().Name);
var otherResult = await this.commandBus.Send(new BoringCommand(), cancellationToken);
var otherResult2 = await this.commandBus.Send(new BoringCommand(), cancellationToken);
return 3;
}
}
public interface ICommandBus
{
Task<object> Send(object request, CancellationToken cancellationToken);
void Start(CancellationToken cancellationToken);
Task StopAsync();
}
public class DispatchOnBackgroundThread : ICommandBus
{
private readonly Channel<CommandItem> queue = Channel.CreateUnbounded<CommandItem>();
private Task worker = null;
private readonly DispatchInCallingThread dispatchInCallingThread = new DispatchInCallingThread();
class CommandItem
{
public object Command { get; set; }
public CancellationToken Ct { get; set; }
public TaskCompletionSource<object> Tcs { get; set; }
}
public Task<object> Send(object command, CancellationToken cancellationToken)
{
var item = new CommandItem() { Command = command, Tcs = new TaskCompletionSource<object>(), Ct = cancellationToken };
this.queue.Writer.WriteAsync(item, cancellationToken); // just write and immediatly return the cts
return item.Tcs.Task;
}
public void Start(CancellationToken cancellationToken)
{
var scheduler = new ConcurrentExclusiveSchedulerPair();
this.worker = Task.Factory.StartNew(async () =>
{
CommandItem item = null;
try
{
while (cancellationToken.IsCancellationRequested == false)
{
item = await this.queue.Reader.ReadAsync(cancellationToken);
// simplified DI container magic to static invocation
if (item.Command is BoringCommand boringCommand)
{
var handler = new BoringCommandHandler();
var result = handler.Handle(boringCommand, item.Ct);
item.Tcs.SetResult(result);
}
else if (item.Command is LockMeGoodCommand lockMeGoodCommand)
{
var handler = new LockMeGoodCommandHandler(this.dispatchInCallingThread);
var result = await handler.Handle(lockMeGoodCommand, item.Ct);
item.Tcs.SetResult(result);
}
else
throw new Exception("unknown");
}
}
catch (TaskCanceledException)
{
if (item != null)
item.Tcs.SetCanceled();
}
Console.WriteLine("exit background thread");
})
.Unwrap();
}
public async Task StopAsync()
{
this.queue.Writer.Complete();
await this.worker;
}
}
public class DispatchInCallingThread: ICommandBus
{
public async Task<object> Send(object request, CancellationToken cancellationToken)
{
// simplified DI container magic to static invocation
if (request is BoringCommand boringCommand)
{
var handler = new BoringCommandHandler();
return await handler.Handle(boringCommand, cancellationToken);
}
else if (request is LockMeGoodCommand lockMeGoodCommand)
{
var handler = new LockMeGoodCommandHandler(this);
return await handler.Handle(lockMeGoodCommand, cancellationToken);
}
else
throw new Exception("unknown");
}
public void Start(CancellationToken cancellationToken) { }
public Task StopAsync() { return Task.CompletedTask; }
}
class Program
{
static async Task Main(string[] args)
{
await TestDispatchOnBackgroundThread();
}
static async Task TestDispatchOnBackgroundThread()
{
var cts = new CancellationTokenSource();
Console.CancelKeyPress += delegate {
Console.WriteLine("setting cts.Cancel()");
cts.Cancel();
};
var threadStrategy = new DispatchOnBackgroundThread();
threadStrategy.Start(cts.Token);
while (cts.IsCancellationRequested == false)
{
Console.WriteLine("***************** sending new batch ****************");
var result1 = await threadStrategy.Send(new BoringCommand(), cts.Token);
var result3 = await threadStrategy.Send(new LockMeGoodCommand(), cts.Token);
Thread.Sleep(1000);
}
await threadStrategy.StopAsync();
}
}
}
有关更多信息,依赖注入的实际实现在此处