状态机和网络套接字——如何处理竞争条件
State machine and network socket - how to handle race conditions
我在我的 C# 网络项目中使用无状态,主要是因为它是添加功能的好方法,例如套接字连接后的线级授权、重新连接延迟等。
话虽如此,我 运行 陷入了我自己所做的一些竞争条件和僵局 - 就以下状态下处理此问题的最佳方法寻求建议:
enum State { Stopped, Disconnected, Connecting, Connected, Resetting }
enum Trigger { Start, Stop, Connect, SetConnectComplete, Reset, SetResetComplete }
class StateMachine : StateMachine<State, Trigger>
{
public StateMachine(Action OnDisconnected, Action OnConnecting, Action OnConnected, Action OnResetting) : base(State.Stopped)
{
this.Configure(State.Stopped)
.Permit(Trigger.Start, State.Disconnected);
this.Configure(State.Disconnected)
.OnEntry(OnDisconnected)
.Permit(Trigger.Connect, State.Connecting);
this.Configure(State.Connecting)
.OnEntry(OnConnecting)
.Permit(Trigger.SetConnectComplete, State.Connected)
.Permit(Trigger.Reset, State.Resetting);
this.Configure(State.Connected)
.OnEntry(OnConnected)
.Permit(Trigger.Reset, State.Resetting);
this.Configure(State.Resetting)
.OnEntry(OnResetting)
.Permit(Trigger.SetResetComplete, State.Disconnected);
}
}
它的功能是套接字会自动重新连接,并在连接时启动接收循环。如果发生套接字错误,它应该返回以释放资源,然后循环返回以重新启动。
然而,当我处理该对象时,连接的套接字中止,这也释放了资源,而且它会尝试自行等待。
我相信这与等待自身的线程有关,所以我的 design/state 结构肯定是根本不存在的,感谢有关可以完全避免死锁的更好结构的指针。
public class ManagedWebSocket : IDisposable
{
readonly StateMachine stateMachine;
Task backgroundReaderTask;
private ClientWebSocket webSocket;
private readonly ITargetBlock<byte[]> target;
private readonly ILogger<ManagedWebSocket> logger;
private CancellationTokenSource cancellationTokenSource;
bool isDisposing;
public ManagedWebSocket(string uri, ITargetBlock<byte[]> target, ILogger<ManagedWebSocket> logger)
{
this.stateMachine = new StateMachine(OnDisconnected, OnConnecting, OnConnected, OnResetting);
this.target = target;
this.logger = logger;
}
private void OnConnecting()
{
this.backgroundReaderTask = Task.Run(async () =>
{
this.cancellationTokenSource = new CancellationTokenSource();
this.webSocket = new ClientWebSocket();
webSocket.Options.KeepAliveInterval = KeepAliveInterval;
try
{
await this.webSocket.ConnectAsync(this.uri, cancellationTokenSource.Token);
}
catch(WebSocketException ex)
{
this.logger.LogError(ex.Message, ex);
await this.stateMachine.FireAsync(Trigger.Reset);
}
this.stateMachine.Fire(Trigger.SetConnectComplete);
});
}
private void OnDisconnected()
{
if (isDisposing == false)
this.stateMachine.Fire(Trigger.Connect);
}
private void OnResetting()
{
FreeResources();
this.stateMachine.Fire(Trigger.SetResetComplete);
}
private void OnConnected()
{
this.backgroundReaderTask = Task.Run( async () => {
try
{
// returns when the internal frame loop completes with websocket close, or by throwing an exception
await this.webSocket.ReceiveFramesLoopAsync(target.SendAsync, 2048, this.cancellationTokenSource.Token);
}
catch (Exception ex)
{
this.logger.LogError(ex.Message, ex);
}
await this.stateMachine.FireAsync(Trigger.Reset);
});
}
public async Task SendAsync(byte[] data, WebSocketMessageType webSocketMessageType)
{
if (this.stateMachine.State != State.Connected)
throw new Exception($"{nameof(ManagedWebSocket)} is not yet connected.");
try
{
await webSocket
.SendAsChunksAsync(data, webSocketMessageType, 2048, this.cancellationTokenSource.Token)
.ConfigureAwait(false);
}
catch (Exception ex)
{
this.logger.LogError(ex, ex.Message);
await this.stateMachine.FireAsync(Trigger.Reset);
}
}
public void Start()
{
this.stateMachine.Fire(Trigger.Start);
}
public void FreeResources()
{
this.logger.LogDebug($"{nameof(ManagedWebSocket.FreeResources)}");
this.cancellationTokenSource?.Cancel();
this.backgroundReaderTask?.Wait();
this.cancellationTokenSource?.Dispose();
this.backgroundReaderTask?.Dispose();
}
public void Dispose()
{
if (isDisposing)
return;
isDisposing = true;
FreeResources();
}
}
我猜死锁是由在 OnResetting()
中调用 FreeResources();
引起的,因为 FreeResources();
正在等待 backgroundReaderTask
但在 backgroundReaderTask
中你正在等待 OnResetting()
通过 await this.stateMachine.FireAsync(Trigger.Reset);
.
作为某种解决方法,您可以省略触发重置的“await”关键字,因为它无论如何都会处理整个对象。
另请注意,如果先前在 OnConnecting()
中抛出异常,似乎没有理由调用 this.stateMachine.Fire(Trigger.SetConnectComplete);
- 只需将其移至 try-block。
此外,作为某种最佳实践和旁注,请尝试遵循 recommended dispose pattern
我在我的 C# 网络项目中使用无状态,主要是因为它是添加功能的好方法,例如套接字连接后的线级授权、重新连接延迟等。
话虽如此,我 运行 陷入了我自己所做的一些竞争条件和僵局 - 就以下状态下处理此问题的最佳方法寻求建议:
enum State { Stopped, Disconnected, Connecting, Connected, Resetting }
enum Trigger { Start, Stop, Connect, SetConnectComplete, Reset, SetResetComplete }
class StateMachine : StateMachine<State, Trigger>
{
public StateMachine(Action OnDisconnected, Action OnConnecting, Action OnConnected, Action OnResetting) : base(State.Stopped)
{
this.Configure(State.Stopped)
.Permit(Trigger.Start, State.Disconnected);
this.Configure(State.Disconnected)
.OnEntry(OnDisconnected)
.Permit(Trigger.Connect, State.Connecting);
this.Configure(State.Connecting)
.OnEntry(OnConnecting)
.Permit(Trigger.SetConnectComplete, State.Connected)
.Permit(Trigger.Reset, State.Resetting);
this.Configure(State.Connected)
.OnEntry(OnConnected)
.Permit(Trigger.Reset, State.Resetting);
this.Configure(State.Resetting)
.OnEntry(OnResetting)
.Permit(Trigger.SetResetComplete, State.Disconnected);
}
}
它的功能是套接字会自动重新连接,并在连接时启动接收循环。如果发生套接字错误,它应该返回以释放资源,然后循环返回以重新启动。
然而,当我处理该对象时,连接的套接字中止,这也释放了资源,而且它会尝试自行等待。
我相信这与等待自身的线程有关,所以我的 design/state 结构肯定是根本不存在的,感谢有关可以完全避免死锁的更好结构的指针。
public class ManagedWebSocket : IDisposable
{
readonly StateMachine stateMachine;
Task backgroundReaderTask;
private ClientWebSocket webSocket;
private readonly ITargetBlock<byte[]> target;
private readonly ILogger<ManagedWebSocket> logger;
private CancellationTokenSource cancellationTokenSource;
bool isDisposing;
public ManagedWebSocket(string uri, ITargetBlock<byte[]> target, ILogger<ManagedWebSocket> logger)
{
this.stateMachine = new StateMachine(OnDisconnected, OnConnecting, OnConnected, OnResetting);
this.target = target;
this.logger = logger;
}
private void OnConnecting()
{
this.backgroundReaderTask = Task.Run(async () =>
{
this.cancellationTokenSource = new CancellationTokenSource();
this.webSocket = new ClientWebSocket();
webSocket.Options.KeepAliveInterval = KeepAliveInterval;
try
{
await this.webSocket.ConnectAsync(this.uri, cancellationTokenSource.Token);
}
catch(WebSocketException ex)
{
this.logger.LogError(ex.Message, ex);
await this.stateMachine.FireAsync(Trigger.Reset);
}
this.stateMachine.Fire(Trigger.SetConnectComplete);
});
}
private void OnDisconnected()
{
if (isDisposing == false)
this.stateMachine.Fire(Trigger.Connect);
}
private void OnResetting()
{
FreeResources();
this.stateMachine.Fire(Trigger.SetResetComplete);
}
private void OnConnected()
{
this.backgroundReaderTask = Task.Run( async () => {
try
{
// returns when the internal frame loop completes with websocket close, or by throwing an exception
await this.webSocket.ReceiveFramesLoopAsync(target.SendAsync, 2048, this.cancellationTokenSource.Token);
}
catch (Exception ex)
{
this.logger.LogError(ex.Message, ex);
}
await this.stateMachine.FireAsync(Trigger.Reset);
});
}
public async Task SendAsync(byte[] data, WebSocketMessageType webSocketMessageType)
{
if (this.stateMachine.State != State.Connected)
throw new Exception($"{nameof(ManagedWebSocket)} is not yet connected.");
try
{
await webSocket
.SendAsChunksAsync(data, webSocketMessageType, 2048, this.cancellationTokenSource.Token)
.ConfigureAwait(false);
}
catch (Exception ex)
{
this.logger.LogError(ex, ex.Message);
await this.stateMachine.FireAsync(Trigger.Reset);
}
}
public void Start()
{
this.stateMachine.Fire(Trigger.Start);
}
public void FreeResources()
{
this.logger.LogDebug($"{nameof(ManagedWebSocket.FreeResources)}");
this.cancellationTokenSource?.Cancel();
this.backgroundReaderTask?.Wait();
this.cancellationTokenSource?.Dispose();
this.backgroundReaderTask?.Dispose();
}
public void Dispose()
{
if (isDisposing)
return;
isDisposing = true;
FreeResources();
}
}
我猜死锁是由在 OnResetting()
中调用 FreeResources();
引起的,因为 FreeResources();
正在等待 backgroundReaderTask
但在 backgroundReaderTask
中你正在等待 OnResetting()
通过 await this.stateMachine.FireAsync(Trigger.Reset);
.
作为某种解决方法,您可以省略触发重置的“await”关键字,因为它无论如何都会处理整个对象。
另请注意,如果先前在 OnConnecting()
中抛出异常,似乎没有理由调用 this.stateMachine.Fire(Trigger.SetConnectComplete);
- 只需将其移至 try-block。
此外,作为某种最佳实践和旁注,请尝试遵循 recommended dispose pattern