WCF 代理包装器 - 离线缓冲数据,重新连接并在线发送一次

WCF proxy wrapper - buffering data offline, reconnect and send once back online

我已经围绕 WCF class 编写了一个包装器,它将缓冲发送数据,并在重新连接后发送。我有以下问题请教:

  1. 我是否需要订阅所有通道故障事件,或者 ChannelFactory.Faulted 是否足够?

  2. 这是在许多深夜后写成的,希望有新的眼睛,有人能看到下面的任何令人震惊的地方吗?

  3. 关于实施一般重新连接和最佳实践的任何推荐读物(我看过 Polly 库,但看不到它如何在后台线程中执行重新连接)所以我没有重新发明轮子?任何通用框架来实现我试图写的东西

请看下面我的代码:

public class ClientProxyWrapper
{
    private readonly Action<Dictionary<string, string>> _incomingCallCallback;
    private ScreenPopClient _proxy;
    private volatile bool _isConnecting;
    private volatile bool _isReady;
    private readonly object _lock = new object();
    private readonly InstanceContext _context;
    private TreadSafeStack<Dictionary<string,string> _offlineValues = new TreadSafeStack<Dictionary<string,string>();

    public ClientProxyWrapper(Action<Dictionary<string,string>> incomingCallCallback)
    {
        _isReady = false;
        _incomingCallCallback = incomingCallCallback;
        _context = new InstanceContext(this);
        StartConnectTask(0);
    }

    void CreateNewProxy()
    {
        _proxy = new ScreenPopClient(_context);
        _proxy.ChannelFactory.Faulted += OnChannelFault;
        _proxy.InnerChannel.Faulted += OnChannelFault;            
        _proxy.InnerDuplexChannel.Faulted += OnChannelFault;            
    }

    void StartConnectTask(int startDelay)
    {
        lock (_lock)
        {                
            if (_isConnecting) return; // we are already connecting
            _isConnecting = true;
        }

        Task.Run(() =>
        {
            while (true)
            {
                Thread.Sleep(startDelay);
                try
                {
                    CreateNewProxy();
                    _proxy.Login(Environment.UserName);

                    Dictionary<string,string> toSend;
                    while(_offlineValues.Get(out toSend))
                    {
                        _proxy.Update(toSend);
                    }
                    _isConnecting = false;
                    _isReady = true;
                    return;
                }
                catch (Exception)
                {
                }
            }
        });
    }

    void OnChannelFault(object sender, EventArgs e)
    {
        ((ICommunicationObject)sender).Abort();
        // reconnect...
        StartConnectTask(5000);
    }

    void Update(Dictionary<string,string> values)
    {
        if(_isReady)
        {
            // put values into a thread safe queue, to be sent once we come back online
            _offlineValues.Add(values);
        }
        else
        {
            _proxy.Update(values);
        }
    }
}

只看代码 - 一般印象:

使用 Task.Run 运行 同步方法有点笨拙 - 并且通常不被认为是对 Tasks 的良好使用。

看来你有能力丢掉工作。例如,您如何处理尚未发送的数据 - 由于连接问题或其他原因。我认为有几个开放的边缘案例。例如,您似乎无法从多个线程调用 Update 而不会丢失任何东西。如果您有 _offlineValues 并且要关机,会发生什么情况?

我不知道 TreadSafeStack 是什么。那只是样本?您认为可能是 ConcurrentQueue?

演员之间的关系似乎很尴尬。一般来说,我认为您会有一个处理器,它可以将通信与仅将工作提交给处理器的工作排队方法分开。处理器将检测代理的状态并根据需要重新连接它。

工作的排队很容易成为线程安全的。您可能不想在排队时等待服务。最好将长 运行ning 发送与快速 运行ning 提交分开。

我有一个类似的东西 - 它只是 shell - 我去掉了所有让我的东西与你的东西不同的噪音 - 但也许你可以从中得到我在说什么(我的排队类似于您的更新)。

class Sender
{
  /// <summary>Synchronization primitive</summary>
  private static object sync = new object( );

  /// <summary>Processor wait this long before checking for work (if not nudged)</summary>
  private const int waitTimeOutMilliseconds = 5000;

  /// <summary>What the processor waits on to hear that there are things to do</summary>
  private ManualResetEvent nudge = new ManualResetEvent( false );

  /// <summary>The processor sets this when it's been signaled to stop and message processing is complete</summary>
  private ManualResetEvent done = new ManualResetEvent( false );

  /// <summary>A flag that will be set when the host wants to terminate the message queue processor</summary>
  private bool shutDown = true;

  /// <summary>A queue of messages that need to be sent</summary>
  private Queue<string> queue = new Queue<string>( );


  /// <summary>Puts a message in the queue</summary>
  public void Enqueue( string message )
  {
    lock ( sync )
    {
      if ( shutDown ) throw new InvalidOperationException( "Shutting down - not accepting messages" );
      queue.Enqueue( message );
      nudge.Set( );
    }
  }

  /// <summary>Shuts down without waiting</summary>
  public void Stop( )
  {
    lock ( sync )
    {
      shutDown = true;
      nudge.Set( );
    }
  }

  /// <summary>Starts queue processing</summary>
  public void Start( )
  {
    if ( WaitForShutdown( 5000 ) )
    {
      lock ( sync )
      {
        shutDown = false;
        done.Reset( );
        nudge.Reset( );
        ThreadPool.QueueUserWorkItem( Processor );
      }
    }
    else
    {
      throw new InvalidOperationException( "Couldn't start - that's bad!" );
    }
  }

  /// <summary>Stops accepting messages on the queue, triggers shutdown and waits for the worker thread to complete.</summary>
  /// <param name="millisecondsToWait"></param>
  /// <returns>True if the thread stops in the time specified, false otherwise</returns>
  private bool WaitForShutdown( int millisecondsToWait )
  {
    Stop( );
    lock ( sync )
    {
      if ( shutDown ) return true;
    }
    return done.WaitOne( millisecondsToWait );
  }


  /// <summary>Worker thread method that writes the message</summary>
  private void Processor( object state )
  {

    var processList = new List<string>( );  //--> work we'll take out of the queue
    var cancel = false;  //--> a local representation of shutdown, we'll obtain while under a lock

    while ( true )
    {
      nudge.WaitOne( waitTimeOutMilliseconds );

      lock ( sync )
      {
        cancel = shutDown;
        while ( queue.Any( ) )
        {
          processList.Add( queue.Dequeue( ) );
        }
        nudge.Reset( );
      }

      foreach ( var message in processList )
      {
        try
        {
          // send to service...
        }
        catch ( Exception ex )
        {
          // reconnect or do whatever is appropriate to handle issues...
        }
        if ( cancel ) break;
      }

      processList.Clear( );

      if ( cancel )
      {
        done.Set( );
        return;
      }
    }
  }
}

如果 ClientProxyWrapper 是 运行 的进程意外终止,所有消息会怎样?

您的邮件没有持久性,因此不能保证它们一定会送达。如果你使用像 MSMQ 这样的队列系统来存储消息然后从那里处理它们会更好。

消息队列可以跨国化,减少丢失消息的几率