WCF 代理包装器 - 离线缓冲数据,重新连接并在线发送一次
WCF proxy wrapper - buffering data offline, reconnect and send once back online
我已经围绕 WCF class 编写了一个包装器,它将缓冲发送数据,并在重新连接后发送。我有以下问题请教:
我是否需要订阅所有通道故障事件,或者 ChannelFactory.Faulted 是否足够?
这是在许多深夜后写成的,希望有新的眼睛,有人能看到下面的任何令人震惊的地方吗?
关于实施一般重新连接和最佳实践的任何推荐读物(我看过 Polly 库,但看不到它如何在后台线程中执行重新连接)所以我没有重新发明轮子?任何通用框架来实现我试图写的东西
请看下面我的代码:
public class ClientProxyWrapper
{
private readonly Action<Dictionary<string, string>> _incomingCallCallback;
private ScreenPopClient _proxy;
private volatile bool _isConnecting;
private volatile bool _isReady;
private readonly object _lock = new object();
private readonly InstanceContext _context;
private TreadSafeStack<Dictionary<string,string> _offlineValues = new TreadSafeStack<Dictionary<string,string>();
public ClientProxyWrapper(Action<Dictionary<string,string>> incomingCallCallback)
{
_isReady = false;
_incomingCallCallback = incomingCallCallback;
_context = new InstanceContext(this);
StartConnectTask(0);
}
void CreateNewProxy()
{
_proxy = new ScreenPopClient(_context);
_proxy.ChannelFactory.Faulted += OnChannelFault;
_proxy.InnerChannel.Faulted += OnChannelFault;
_proxy.InnerDuplexChannel.Faulted += OnChannelFault;
}
void StartConnectTask(int startDelay)
{
lock (_lock)
{
if (_isConnecting) return; // we are already connecting
_isConnecting = true;
}
Task.Run(() =>
{
while (true)
{
Thread.Sleep(startDelay);
try
{
CreateNewProxy();
_proxy.Login(Environment.UserName);
Dictionary<string,string> toSend;
while(_offlineValues.Get(out toSend))
{
_proxy.Update(toSend);
}
_isConnecting = false;
_isReady = true;
return;
}
catch (Exception)
{
}
}
});
}
void OnChannelFault(object sender, EventArgs e)
{
((ICommunicationObject)sender).Abort();
// reconnect...
StartConnectTask(5000);
}
void Update(Dictionary<string,string> values)
{
if(_isReady)
{
// put values into a thread safe queue, to be sent once we come back online
_offlineValues.Add(values);
}
else
{
_proxy.Update(values);
}
}
}
只看代码 - 一般印象:
使用 Task.Run 运行 同步方法有点笨拙 - 并且通常不被认为是对 Tasks 的良好使用。
看来你有能力丢掉工作。例如,您如何处理尚未发送的数据 - 由于连接问题或其他原因。我认为有几个开放的边缘案例。例如,您似乎无法从多个线程调用 Update 而不会丢失任何东西。如果您有 _offlineValues 并且要关机,会发生什么情况?
我不知道 TreadSafeStack 是什么。那只是样本?您认为可能是 ConcurrentQueue?
演员之间的关系似乎很尴尬。一般来说,我认为您会有一个处理器,它可以将通信与仅将工作提交给处理器的工作排队方法分开。处理器将检测代理的状态并根据需要重新连接它。
工作的排队很容易成为线程安全的。您可能不想在排队时等待服务。最好将长 运行ning 发送与快速 运行ning 提交分开。
我有一个类似的东西 - 它只是 shell - 我去掉了所有让我的东西与你的东西不同的噪音 - 但也许你可以从中得到我在说什么(我的排队类似于您的更新)。
class Sender
{
/// <summary>Synchronization primitive</summary>
private static object sync = new object( );
/// <summary>Processor wait this long before checking for work (if not nudged)</summary>
private const int waitTimeOutMilliseconds = 5000;
/// <summary>What the processor waits on to hear that there are things to do</summary>
private ManualResetEvent nudge = new ManualResetEvent( false );
/// <summary>The processor sets this when it's been signaled to stop and message processing is complete</summary>
private ManualResetEvent done = new ManualResetEvent( false );
/// <summary>A flag that will be set when the host wants to terminate the message queue processor</summary>
private bool shutDown = true;
/// <summary>A queue of messages that need to be sent</summary>
private Queue<string> queue = new Queue<string>( );
/// <summary>Puts a message in the queue</summary>
public void Enqueue( string message )
{
lock ( sync )
{
if ( shutDown ) throw new InvalidOperationException( "Shutting down - not accepting messages" );
queue.Enqueue( message );
nudge.Set( );
}
}
/// <summary>Shuts down without waiting</summary>
public void Stop( )
{
lock ( sync )
{
shutDown = true;
nudge.Set( );
}
}
/// <summary>Starts queue processing</summary>
public void Start( )
{
if ( WaitForShutdown( 5000 ) )
{
lock ( sync )
{
shutDown = false;
done.Reset( );
nudge.Reset( );
ThreadPool.QueueUserWorkItem( Processor );
}
}
else
{
throw new InvalidOperationException( "Couldn't start - that's bad!" );
}
}
/// <summary>Stops accepting messages on the queue, triggers shutdown and waits for the worker thread to complete.</summary>
/// <param name="millisecondsToWait"></param>
/// <returns>True if the thread stops in the time specified, false otherwise</returns>
private bool WaitForShutdown( int millisecondsToWait )
{
Stop( );
lock ( sync )
{
if ( shutDown ) return true;
}
return done.WaitOne( millisecondsToWait );
}
/// <summary>Worker thread method that writes the message</summary>
private void Processor( object state )
{
var processList = new List<string>( ); //--> work we'll take out of the queue
var cancel = false; //--> a local representation of shutdown, we'll obtain while under a lock
while ( true )
{
nudge.WaitOne( waitTimeOutMilliseconds );
lock ( sync )
{
cancel = shutDown;
while ( queue.Any( ) )
{
processList.Add( queue.Dequeue( ) );
}
nudge.Reset( );
}
foreach ( var message in processList )
{
try
{
// send to service...
}
catch ( Exception ex )
{
// reconnect or do whatever is appropriate to handle issues...
}
if ( cancel ) break;
}
processList.Clear( );
if ( cancel )
{
done.Set( );
return;
}
}
}
}
如果 ClientProxyWrapper
是 运行 的进程意外终止,所有消息会怎样?
您的邮件没有持久性,因此不能保证它们一定会送达。如果你使用像 MSMQ 这样的队列系统来存储消息然后从那里处理它们会更好。
消息队列可以跨国化,减少丢失消息的几率
我已经围绕 WCF class 编写了一个包装器,它将缓冲发送数据,并在重新连接后发送。我有以下问题请教:
我是否需要订阅所有通道故障事件,或者 ChannelFactory.Faulted 是否足够?
这是在许多深夜后写成的,希望有新的眼睛,有人能看到下面的任何令人震惊的地方吗?
关于实施一般重新连接和最佳实践的任何推荐读物(我看过 Polly 库,但看不到它如何在后台线程中执行重新连接)所以我没有重新发明轮子?任何通用框架来实现我试图写的东西
请看下面我的代码:
public class ClientProxyWrapper
{
private readonly Action<Dictionary<string, string>> _incomingCallCallback;
private ScreenPopClient _proxy;
private volatile bool _isConnecting;
private volatile bool _isReady;
private readonly object _lock = new object();
private readonly InstanceContext _context;
private TreadSafeStack<Dictionary<string,string> _offlineValues = new TreadSafeStack<Dictionary<string,string>();
public ClientProxyWrapper(Action<Dictionary<string,string>> incomingCallCallback)
{
_isReady = false;
_incomingCallCallback = incomingCallCallback;
_context = new InstanceContext(this);
StartConnectTask(0);
}
void CreateNewProxy()
{
_proxy = new ScreenPopClient(_context);
_proxy.ChannelFactory.Faulted += OnChannelFault;
_proxy.InnerChannel.Faulted += OnChannelFault;
_proxy.InnerDuplexChannel.Faulted += OnChannelFault;
}
void StartConnectTask(int startDelay)
{
lock (_lock)
{
if (_isConnecting) return; // we are already connecting
_isConnecting = true;
}
Task.Run(() =>
{
while (true)
{
Thread.Sleep(startDelay);
try
{
CreateNewProxy();
_proxy.Login(Environment.UserName);
Dictionary<string,string> toSend;
while(_offlineValues.Get(out toSend))
{
_proxy.Update(toSend);
}
_isConnecting = false;
_isReady = true;
return;
}
catch (Exception)
{
}
}
});
}
void OnChannelFault(object sender, EventArgs e)
{
((ICommunicationObject)sender).Abort();
// reconnect...
StartConnectTask(5000);
}
void Update(Dictionary<string,string> values)
{
if(_isReady)
{
// put values into a thread safe queue, to be sent once we come back online
_offlineValues.Add(values);
}
else
{
_proxy.Update(values);
}
}
}
只看代码 - 一般印象:
使用 Task.Run 运行 同步方法有点笨拙 - 并且通常不被认为是对 Tasks 的良好使用。
看来你有能力丢掉工作。例如,您如何处理尚未发送的数据 - 由于连接问题或其他原因。我认为有几个开放的边缘案例。例如,您似乎无法从多个线程调用 Update 而不会丢失任何东西。如果您有 _offlineValues 并且要关机,会发生什么情况?
我不知道 TreadSafeStack 是什么。那只是样本?您认为可能是 ConcurrentQueue?
演员之间的关系似乎很尴尬。一般来说,我认为您会有一个处理器,它可以将通信与仅将工作提交给处理器的工作排队方法分开。处理器将检测代理的状态并根据需要重新连接它。
工作的排队很容易成为线程安全的。您可能不想在排队时等待服务。最好将长 运行ning 发送与快速 运行ning 提交分开。
我有一个类似的东西 - 它只是 shell - 我去掉了所有让我的东西与你的东西不同的噪音 - 但也许你可以从中得到我在说什么(我的排队类似于您的更新)。
class Sender
{
/// <summary>Synchronization primitive</summary>
private static object sync = new object( );
/// <summary>Processor wait this long before checking for work (if not nudged)</summary>
private const int waitTimeOutMilliseconds = 5000;
/// <summary>What the processor waits on to hear that there are things to do</summary>
private ManualResetEvent nudge = new ManualResetEvent( false );
/// <summary>The processor sets this when it's been signaled to stop and message processing is complete</summary>
private ManualResetEvent done = new ManualResetEvent( false );
/// <summary>A flag that will be set when the host wants to terminate the message queue processor</summary>
private bool shutDown = true;
/// <summary>A queue of messages that need to be sent</summary>
private Queue<string> queue = new Queue<string>( );
/// <summary>Puts a message in the queue</summary>
public void Enqueue( string message )
{
lock ( sync )
{
if ( shutDown ) throw new InvalidOperationException( "Shutting down - not accepting messages" );
queue.Enqueue( message );
nudge.Set( );
}
}
/// <summary>Shuts down without waiting</summary>
public void Stop( )
{
lock ( sync )
{
shutDown = true;
nudge.Set( );
}
}
/// <summary>Starts queue processing</summary>
public void Start( )
{
if ( WaitForShutdown( 5000 ) )
{
lock ( sync )
{
shutDown = false;
done.Reset( );
nudge.Reset( );
ThreadPool.QueueUserWorkItem( Processor );
}
}
else
{
throw new InvalidOperationException( "Couldn't start - that's bad!" );
}
}
/// <summary>Stops accepting messages on the queue, triggers shutdown and waits for the worker thread to complete.</summary>
/// <param name="millisecondsToWait"></param>
/// <returns>True if the thread stops in the time specified, false otherwise</returns>
private bool WaitForShutdown( int millisecondsToWait )
{
Stop( );
lock ( sync )
{
if ( shutDown ) return true;
}
return done.WaitOne( millisecondsToWait );
}
/// <summary>Worker thread method that writes the message</summary>
private void Processor( object state )
{
var processList = new List<string>( ); //--> work we'll take out of the queue
var cancel = false; //--> a local representation of shutdown, we'll obtain while under a lock
while ( true )
{
nudge.WaitOne( waitTimeOutMilliseconds );
lock ( sync )
{
cancel = shutDown;
while ( queue.Any( ) )
{
processList.Add( queue.Dequeue( ) );
}
nudge.Reset( );
}
foreach ( var message in processList )
{
try
{
// send to service...
}
catch ( Exception ex )
{
// reconnect or do whatever is appropriate to handle issues...
}
if ( cancel ) break;
}
processList.Clear( );
if ( cancel )
{
done.Set( );
return;
}
}
}
}
如果 ClientProxyWrapper
是 运行 的进程意外终止,所有消息会怎样?
您的邮件没有持久性,因此不能保证它们一定会送达。如果你使用像 MSMQ 这样的队列系统来存储消息然后从那里处理它们会更好。
消息队列可以跨国化,减少丢失消息的几率