继承自 Stephen Cleary 的 Nito.Disposables NuGet 包
Inheriting from Stephen Cleary's Nito.Disposables NuGet package
我看到了 Stephen Cleary 的 Disposables NuGet 包的实现,它在我的情况下似乎是完美的,即使如此,我也找不到关于如何继承它的例子。
我的想法是让UnsubscribeAsync().GetAwaiter().GetResult();
变成await UnsubscribeAsync();
,这意味着它应该被包装成一个IAsyncDisposable。我怎样才能用密封的 class 实现它?
public sealed class LiveTradeManager : ITradeManager, IDisposable
{
private bool _disposed;
private readonly ILogger<LiveTradeManager> _logger;
private readonly TradeOptions _tradeOptions;
private readonly IBotClient _client;
private string _listenKey;
private UpdateSubscription _candleSubscription, _accountUpdateSubscription;
private IDictionary<string, Channel<IBinanceStreamKlineData>> _channels;
public LiveTradeManager(ILogger<LiveTradeManager> logger, IOptions<TradeOptions> tradeOptions, IOptions<ExchangeOptions> exchangeOptions, IBotClientFactory clientFactory)
{
_logger = logger;
_tradeOptions = tradeOptions.Value;
_client = clientFactory.GetBotClient(exchangeOptions.Value.BotClientType);
}
public bool IsPaused { get; set; }
public async Task RunAsync(CancellationToken cancellationToken)
{
try
{
await SubscribeAsync(cancellationToken).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
}
catch (Exception ex) when (Handle(() => _logger.LogError(ex, "Unexpected error.")))
{
}
}
private async Task SubscribeAsync(CancellationToken cancellationToken)
{
// Subscribe to account updates
_listenKey = await _client.GetListenKeyAsync(cancellationToken).ConfigureAwait(false);
void OnOrderUpdate(BinanceStreamOrderUpdate order)
{
// order update logic
}
_accountUpdateSubscription = await _client.SubscribeToUserDataUpdatesAsync(_listenKey, OnOrderUpdate).ConfigureAwait(false);
_ = Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
await _client.KeepAliveListenKeyAsync(_listenKey, cancellationToken).ConfigureAwait(false);
await Task.Delay(TimeSpan.FromMinutes(30), cancellationToken).ConfigureAwait(false);
}
}, cancellationToken);
// Subscribe to candle updates
var symbols = _tradeOptions.Symbols.Select(x => x.ToString()).ToList();
_channels = symbols.ToDictionary(x => x, _ =>
Channel.CreateBounded<IBinanceStreamKlineData>(new BoundedChannelOptions(1)
{FullMode = BoundedChannelFullMode.DropOldest}));
async void OnCandleReceived(IBinanceStreamKlineData data)
{
if (IsPaused) return;
try
{
var ohlcv = data.Data.ToCandle();
if (data.Data.Final)
{
_logger.LogInformation(
$"[{data.Symbol}] Finalized candle | Open time: {ohlcv.Timestamp.ToDateTimeFormat()} | Price: {ohlcv.Close}");
_ = Task.Run(async () =>
{
await Task.Delay(10000, cancellationToken).ConfigureAwait(false);
}, cancellationToken);
}
else
{
_logger.LogInformation(
$"[{data.Symbol}] Candle update | Open time: {ohlcv.Timestamp.ToDateTimeFormat()} | Price: {ohlcv.Close}");
await _channels[data.Symbol].Writer.WriteAsync(data, cancellationToken).ConfigureAwait(false);
}
}
catch (TaskCanceledException)
{
}
}
_candleSubscription = await _client
.SubscribeToCandleUpdatesAsync(symbols, KlineInterval.OneMinute, OnCandleReceived)
.ConfigureAwait(false);
var tasks = _channels.Values.Select(async channel =>
{
await foreach (var data in channel.Reader.ReadAllAsync(cancellationToken))
{
// long-running logic...
await Task.Delay(10000, cancellationToken).ConfigureAwait(false);
}
});
// NOTE: this would block further logic
await Task.WhenAll(tasks).ConfigureAwait(false);
}
private async Task UnsubscribeAsync()
{
// Unsubscribe account updates
if (!string.IsNullOrEmpty(_listenKey))
{
await _client.StopListenKeyAsync(_listenKey).ConfigureAwait(false);
}
if (_accountUpdateSubscription != null)
{
await _client.UnsubscribeAsync(_accountUpdateSubscription).ConfigureAwait(false);
}
// Unsubscribe candle updates
if (_candleSubscription != null)
{
await _client.UnsubscribeAsync(_candleSubscription).ConfigureAwait(false);
}
// Channels
if (_channels != null)
{
foreach (var channel in _channels.Values)
{
channel.Writer.Complete();
}
_channels.Clear();
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
private void Dispose(bool disposing)
{
if (_disposed) return;
if (disposing)
{
UnsubscribeAsync().GetAwaiter().GetResult();
}
_disposed = true;
}
}
public class BotManagerService : BackgroundService
{
private readonly IHostApplicationLifetime _hostApplicationLifetime;
private readonly IDiscordClient _discordClient;
private readonly ITradeManager _tradeManager;
public BotManagerService(
IHostApplicationLifetime hostApplicationLifetime,
IOptions<ExchangeOptions> options,
IDiscordClient discordClient,
ITradeManagerFactory tradeManagerFactory)
{
_hostApplicationLifetime = hostApplicationLifetime;
_discordClient = discordClient;
_tradeManager = tradeManagerFactory.GetTradeManager(options.Value.TradeManagerType);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
var task1 = _tradeManager.RunAsync(stoppingToken);
var task2 = _discordClient.StartAsync();
await Task.WhenAll(task1, task2).ConfigureAwait(false);
}
finally
{
_hostApplicationLifetime.StopApplication();
}
}
}
I couldn't find examples on how to inherit from it.
Nito.Disposables,和我写的绝大多数代码一样,都是为composition rather than inheritance.
写的
因此,如果您有一个需要实现 IAsyncDisposable
的类型,它应该 包含 一个 IAsyncDisposable
实现并将其接口方法转发给包含的那个对象:
public sealed class LiveTradeManager : ITradeManager, IAsyncDisposable
{
private readonly AsyncDisposable _disposable;
...
public LiveTradeManager(...)
{
...
_disposable = new(async () => await UnsubscribeAsync());
}
public ValueTask DisposeAsync() => _disposable.DisposeAsync();
}
我看到了 Stephen Cleary 的 Disposables NuGet 包的实现,它在我的情况下似乎是完美的,即使如此,我也找不到关于如何继承它的例子。
我的想法是让UnsubscribeAsync().GetAwaiter().GetResult();
变成await UnsubscribeAsync();
,这意味着它应该被包装成一个IAsyncDisposable。我怎样才能用密封的 class 实现它?
public sealed class LiveTradeManager : ITradeManager, IDisposable
{
private bool _disposed;
private readonly ILogger<LiveTradeManager> _logger;
private readonly TradeOptions _tradeOptions;
private readonly IBotClient _client;
private string _listenKey;
private UpdateSubscription _candleSubscription, _accountUpdateSubscription;
private IDictionary<string, Channel<IBinanceStreamKlineData>> _channels;
public LiveTradeManager(ILogger<LiveTradeManager> logger, IOptions<TradeOptions> tradeOptions, IOptions<ExchangeOptions> exchangeOptions, IBotClientFactory clientFactory)
{
_logger = logger;
_tradeOptions = tradeOptions.Value;
_client = clientFactory.GetBotClient(exchangeOptions.Value.BotClientType);
}
public bool IsPaused { get; set; }
public async Task RunAsync(CancellationToken cancellationToken)
{
try
{
await SubscribeAsync(cancellationToken).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
}
catch (Exception ex) when (Handle(() => _logger.LogError(ex, "Unexpected error.")))
{
}
}
private async Task SubscribeAsync(CancellationToken cancellationToken)
{
// Subscribe to account updates
_listenKey = await _client.GetListenKeyAsync(cancellationToken).ConfigureAwait(false);
void OnOrderUpdate(BinanceStreamOrderUpdate order)
{
// order update logic
}
_accountUpdateSubscription = await _client.SubscribeToUserDataUpdatesAsync(_listenKey, OnOrderUpdate).ConfigureAwait(false);
_ = Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
await _client.KeepAliveListenKeyAsync(_listenKey, cancellationToken).ConfigureAwait(false);
await Task.Delay(TimeSpan.FromMinutes(30), cancellationToken).ConfigureAwait(false);
}
}, cancellationToken);
// Subscribe to candle updates
var symbols = _tradeOptions.Symbols.Select(x => x.ToString()).ToList();
_channels = symbols.ToDictionary(x => x, _ =>
Channel.CreateBounded<IBinanceStreamKlineData>(new BoundedChannelOptions(1)
{FullMode = BoundedChannelFullMode.DropOldest}));
async void OnCandleReceived(IBinanceStreamKlineData data)
{
if (IsPaused) return;
try
{
var ohlcv = data.Data.ToCandle();
if (data.Data.Final)
{
_logger.LogInformation(
$"[{data.Symbol}] Finalized candle | Open time: {ohlcv.Timestamp.ToDateTimeFormat()} | Price: {ohlcv.Close}");
_ = Task.Run(async () =>
{
await Task.Delay(10000, cancellationToken).ConfigureAwait(false);
}, cancellationToken);
}
else
{
_logger.LogInformation(
$"[{data.Symbol}] Candle update | Open time: {ohlcv.Timestamp.ToDateTimeFormat()} | Price: {ohlcv.Close}");
await _channels[data.Symbol].Writer.WriteAsync(data, cancellationToken).ConfigureAwait(false);
}
}
catch (TaskCanceledException)
{
}
}
_candleSubscription = await _client
.SubscribeToCandleUpdatesAsync(symbols, KlineInterval.OneMinute, OnCandleReceived)
.ConfigureAwait(false);
var tasks = _channels.Values.Select(async channel =>
{
await foreach (var data in channel.Reader.ReadAllAsync(cancellationToken))
{
// long-running logic...
await Task.Delay(10000, cancellationToken).ConfigureAwait(false);
}
});
// NOTE: this would block further logic
await Task.WhenAll(tasks).ConfigureAwait(false);
}
private async Task UnsubscribeAsync()
{
// Unsubscribe account updates
if (!string.IsNullOrEmpty(_listenKey))
{
await _client.StopListenKeyAsync(_listenKey).ConfigureAwait(false);
}
if (_accountUpdateSubscription != null)
{
await _client.UnsubscribeAsync(_accountUpdateSubscription).ConfigureAwait(false);
}
// Unsubscribe candle updates
if (_candleSubscription != null)
{
await _client.UnsubscribeAsync(_candleSubscription).ConfigureAwait(false);
}
// Channels
if (_channels != null)
{
foreach (var channel in _channels.Values)
{
channel.Writer.Complete();
}
_channels.Clear();
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
private void Dispose(bool disposing)
{
if (_disposed) return;
if (disposing)
{
UnsubscribeAsync().GetAwaiter().GetResult();
}
_disposed = true;
}
}
public class BotManagerService : BackgroundService
{
private readonly IHostApplicationLifetime _hostApplicationLifetime;
private readonly IDiscordClient _discordClient;
private readonly ITradeManager _tradeManager;
public BotManagerService(
IHostApplicationLifetime hostApplicationLifetime,
IOptions<ExchangeOptions> options,
IDiscordClient discordClient,
ITradeManagerFactory tradeManagerFactory)
{
_hostApplicationLifetime = hostApplicationLifetime;
_discordClient = discordClient;
_tradeManager = tradeManagerFactory.GetTradeManager(options.Value.TradeManagerType);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
var task1 = _tradeManager.RunAsync(stoppingToken);
var task2 = _discordClient.StartAsync();
await Task.WhenAll(task1, task2).ConfigureAwait(false);
}
finally
{
_hostApplicationLifetime.StopApplication();
}
}
}
I couldn't find examples on how to inherit from it.
Nito.Disposables,和我写的绝大多数代码一样,都是为composition rather than inheritance.
写的因此,如果您有一个需要实现 IAsyncDisposable
的类型,它应该 包含 一个 IAsyncDisposable
实现并将其接口方法转发给包含的那个对象:
public sealed class LiveTradeManager : ITradeManager, IAsyncDisposable
{
private readonly AsyncDisposable _disposable;
...
public LiveTradeManager(...)
{
...
_disposable = new(async () => await UnsubscribeAsync());
}
public ValueTask DisposeAsync() => _disposable.DisposeAsync();
}