如何在 C# 中实现线程关联?

How do I implement thread affinity in C#?

我有一个需要线程关联的第三方 API。我在我的服务应用程序中使用 WCF 来处理来自客户端的请求,然后将这些请求委托给此 API。由于 WCF 使用线程池来处理请求,我尝试使用以下代码解决此问题(使用 SynchronizationContext class):

using System;
using System.Collections.Generic;
using System.ServiceModel;
using System.Threading;

namespace MyAPIService
{
  [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerSession)]
  public class MyService: IService
  {
      ThirdPartyAPI m_api;
      SynchronizationContext m_context;

      MyService()
      {
        m_api = new ThirdPartyAPI();
      }

      public bool Connect(string ipaddress, int port)
      {
        m_context = (SynchronizationContext.Current == null) ? new SynchronizationContext() : SynchronizationContext.Current;
        return m_api.Connect(ipaddress, port);
      }

      public bool Disconnect()
      {
        throw new NotImplementedException();
      }

      public bool IsConnected()
      {
        return Send(() => m_api.IsConnected());
      }

      public TResult Send<TResult>(Func<TResult> func)
      {
        TResult retval = default(TResult);
        m_context.Send(new SendOrPostCallback((x) =>
        {
          retval = func();
        })
        , null);
        return retval;
      }

  }
 }
}

我认为这将允许我在调用 Connect 的同一线程上执行 IsConnected 方法,但从测试来看情况并非如此。 IsConnected 仍然在池中的任何线程上执行。我做错了什么?

如有任何帮助,我们将不胜感激。 非常感谢。

默认同步上下文在线程池上执行您的工作(因此,您没有线程关联)。

如果你想确保工作总是发布到同一个线程,你需要编写自己的同步上下文。例如:

public class SingleThreadSynchronizationContext : SynchronizationContext
{
    private readonly BlockingCollection<(SendOrPostCallback callback, object state)> _queue;
    private readonly Thread _processingThread;

    public SingleThreadSynchronizationContext()
    {
        _queue = new BlockingCollection<(SendOrPostCallback, object)>();
        _processingThread = new Thread(Process) { IsBackground = true };
        _processingThread.Start();
    }

    public override void Send(SendOrPostCallback d, object state)
    {
        using (var mutex = new ManualResetEventSlim())
        {
            var callback = new SendOrPostCallback(s =>
            {
                d(s);
                mutex.Set();
            });

            _queue.Add((callback, state));
            mutex.Wait();
        }
    }

    public override void Post(SendOrPostCallback d, object state)
    {
        _queue.Add((d, state));
    }

    public override SynchronizationContext CreateCopy()
    {
        return this;
    }

    private void Process()
    {
        SetSynchronizationContext(this);

        foreach (var item in _queue.GetConsumingEnumerable())
        {
            item.callback(item.state);
        }
    }
}

请注意,此同步上下文假定回调中未捕获的异常将使进程崩溃。如果不是这种情况(例如,因为您有一个全局异常处理程序),那么您应该添加一些错误处理(在 ProcessSend 中)。