如何在控制台应用程序中调用它的同一线程上继续异步方法调用(SynchronizationContext?)

How to continue async method call on the same thread on which it was called in console application (SynchronizationContext?)

我将 MassTransit(带有 RabbitMQ)与 NHibernate 一起用于 C# 控制台应用程序中的消息队列(实际上作为 Windows 服务托管,使用 TopShelf)。

我们的应用程序的流程包括:

  1. 监视扫描文档的文件共享(使用 FileSystemWatcher)
  2. 对文件进行一些处理(将其移动到新的文件位置,将记录插入我们的数据库)
  3. 对文档进行 OCR 并读取它以查看它是否包含某些单词
  4. 修改我们的数据库记录以反映步骤 3 的结果。

从高水平的技术实施来看,这项工作的步骤是:

  1. 正在 FileSystemWatcher 处理其 Created 事件的线程中进行初始处理。完成后,向我们的队列发布消息以执行 OCR 和文字检查。
  2. MassTransit 处理消息,创建一个新的生命周期范围,并实例化一个消费者来处理它
  3. 消费者通过调用 IOCRService 实现来执行 OCR。完成后,在同一个消费者中,我们(从数据库中)获取我们想要搜索的词,然后阅读文档文本以找到这些词。
  4. Post 返回给 MassTransit/RabbitMQ 的响应消息,此消息的消费者根据是否找到任何单词来修改我们的数据库条目。

我遇到的问题是在上述步骤 3 中。这大约是我们旧代码的样子:

public class Consumer
{
   public Task Handle(message)
   {
      _Ocr.DoOcr(message); //Performed in-process
      var response = DoDirtyWordCheck(message);
      _Publisher.Publish(response);
      return Task.CompletedTask;
   }

   private CheckResponse DoDirtyWordCheck(Message message)
   {
      var wordsToFind = _DB.FindWords();
      var response = _Checker.SearchForWords(message);
   }
}

我对这个流程所做的重大改变是,OCR 步骤被移出进程,放入微服务中,并通过使用 HttpClient 调用微服务来调用.新代码基本相同:

public class Consumer
{
   public async Task Handle(message)
   {
      await _Ocr.DoOcr(message); //calls out to micro-service and awaits the result; method on interface //changed from void return to Task return
      var response = DoDirtyWordCheck(message);
      _Publisher.Publish(response);
   }

   private CheckResponse DoDirtyWordCheck(Message message)
   {
      var wordsToFind = _DB.FindWords();  //Fails here
      var response = _Checker.SearchForWords(message);
   }
}

然而,我一直在发现的是,这现在经常在调用 _DB.FindWords() 时失败。好吧,事实证明,这个调用发生在一个线程上,这个线程不同于我的生命周期范围开始时所在的线程,它与调用 await _OCR.DoOCR(); 的线程是同一个线程。为什么这是个问题?由于 NHibernate Session 不是线程安全的,我们的 DB 层(非常复杂)确保操作只能在创建它的线程上执行。

现在,我以前对 async/await 的理解是,额外的线程不会做任何“技巧”,这样我就不必担心代码必须是线程安全的才能await它。

然而,在深入研究 async/await 并对它的工作原理有了一些了解之后,似乎有些工作实际上 在 ThreadPool 上完成的线程(这是实际等待的工作还是等待之后的继续,我仍然不确定),这与控制台应用程序中没有 SynchronizationContext 的事实有关确定这个过程是如何完成的;然而,在 WPF 应用程序中,在 UI 线程上等待的工作将必然在同一个 UI 线程上继续(这是我期望在所有上下文中表现出的那种行为)。

所以,这让我想到了我的最终问题:我如何确保需要继续的代码在我调用 await 之后在同一个线程上继续?

我知道我上面的代码流可以以防止这种情况发生的方式进行重组,例如,我可以将 OCR 操作和脏字检查操作分解为两个单独的消费者,以便每个操作都有自己独特的 context/lifetime 范围,可能还有其他任何数量的东西。然而,任何需要这种重组的解决方案对我来说都是有问题的,因为它似乎表明 async/await 是一个比乍一看更容易泄漏的抽象。

这段代码(它可能是库代码,在任何上下文中都可能是 运行)似乎不应该依赖于任何与线程模​​型有关的东西,只是因为现在正在等待此链中的一个调用。似乎它应该“正常工作”。在我的应用程序中,与此用例的生命周期、范围和线程有关的所有内容都应在消费者级别是原子的,并且我们希望通过内置于 MassTransit 中的处理来处理(并且已经处理) ,以及我们围绕它构建的代码。

现在,在我看来,我可以设置的 SynchronizationContext(或 TaskSheduler?)中存在一个可能的解决方案,这就是处理此类工作的原因WPF 应用程序或 WinForms 应用程序,但在控制台应用程序中根本不使用(默认情况下)。不幸的是,在控制台应用程序中做任何事情似乎不再是一个常见的用例,更不用说可能提出这种要求的事情了,所以我真的找不到任何预先存在的和经过良好测试的解决方案来做这种事情的事情。此外,由于任何意想不到的影响,我对 async-await 实现的深层内部结构了解得还不够多,所以我会很乐意自己动手​​解决这个问题。

所以,有人可以帮我解决这个问题吗?我的基本假设是否有任何问题让我以错误的方式思考这个问题?还是程序本身整体design/structure真的有问题?

Why is that a problem? Because of the fact that NHibernate Sessions are not thread-safe, our DB layer (which is very complicated) ensures that operations can only be performed on the thread on which it was created.

我不熟悉 NHibernate,但我的第一直觉是查看更新版本的 NHibernate 是否支持 async-感知会话。如果是这样,那么您的解决方案可以像版本升级一样简单。

However, after doing a deep dive into async/await and coming to some understanding about how it works, it seems that some work actually is being done on a ThreadPool thread (whether this is the actual awaited work or the continuation after the await, I'm still not exactly sure)

我通常建议人们从我的 async intro 开始,它试图成为“开始使用 async 所需知道的一切”。 post 中描述的几点应该澄清一下:

  1. async 方法开始同步执行。换句话说,await Func();var task = Func(); await task;大致相同。所以等待的方法被调用首先然后它的任务returns被等待。
  2. 每个 await 捕获一个“上下文”,即当前的 SynchronizationContextTaskScheduler。如果没有上下文,这默认为线程池调度程序。此“上下文”用于在 await 完成后恢复 async 方法。

this has something to do with the fact that in a console application, there is no SynchronizationContext

是的,没错。控制台应用程序没有 SynchronizationContext,因此它们默认使用线程池调度程序。

work that is awaited on the UI thread will be necessarily continued on that same UI thread (which is the sort of behavior I was expecting to exhibited in all contexts).

A SynchronizationContext 确定代码将 运行 的位置; it does not necessarily resume on the same thread。一个反例是 pre-Core ASP.NET SynchronizationContext,它可以在任何线程池线程上恢复。

So, this brings me to my ultimate question: How can I ensure that the code that needs to continue, after my call to await, continues on that same thread?

提供你自己的 SynchronizationContext。或者只是阻止异步方法调用。

async/await is a leakier abstraction than it would seem to be at first glance.

async/await 不是抽象。这是语法糖。如果它是一个抽象,那么是的,它会非常漏洞,因为在大多数情况下你应该去 "async all the way".

Unfortunately, it seems that it's no longer a common use case to be doing anything in a console application, let alone something that might pose this requirement

控制台应用程序开始使用 .NET Core 卷土重来。但是,很少有 使用线程仿射组件的控制台应用程序。

I can't really find any pre-existing and well-tested solutions that do this sort of thing.

你可以使用我写的:AsyncContext is capable of installing a SynchronizationContext and running some asynchronous code and blocking until it completes