System.Reactive - 一次处理未知数量的订阅
System.Reactive - Disposing unknown amount of subscriptions at once
我有数量未知的订阅要一次性处理掉,因为它们可能会变得很多。有没有一种机制可以使用 System.Reactive
一次性处理掉它们?也许,将它们包装成 Observable.Using(() => Disposable.Create...
会起作用吗?
client.Streams.PongStream.Subscribe(x =>
Log.Information($"Pong received ({x.Message})"));
client.Streams.FundingStream.Subscribe(response =>
{
var funding = response.Data;
Log.Information($"Funding: [{funding.Symbol}] rate:[{funding.FundingRate}] " +
$"mark price: {funding.MarkPrice} next funding: {funding.NextFundingTime} " +
$"index price {funding.IndexPrice}");
});
client.Streams.AggregateTradesStream.Subscribe(response =>
{
var trade = response.Data;
Log.Information($"Trade aggreg [{trade.Symbol}] [{trade.Side}] " +
$"price: {trade.Price} size: {trade.Quantity}");
});
client.Streams.TradesStream.Subscribe(response =>
{
var trade = response.Data;
Log.Information($"Trade normal [{trade.Symbol}] [{trade.Side}] " +
$"price: {trade.Price} size: {trade.Quantity}");
});
client.Streams.OrderBookPartialStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Order book snapshot [{ob.Symbol}] " +
$"bid: {ob.Bids.FirstOrDefault()?.Price:F} " +
$"ask: {ob.Asks.FirstOrDefault()?.Price:F}");
Task.Delay(500).Wait();
//OrderBookPartialResponse.StreamFakeSnapshot(response.Data, comm);
});
client.Streams.OrderBookDiffStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Order book diff [{ob.Symbol}] " +
$"bid: {ob.Bids.FirstOrDefault()?.Price:F} " +
$"ask: {ob.Asks.FirstOrDefault()?.Price:F}");
});
client.Streams.BookTickerStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Book ticker [{ob.Symbol}] " +
$"Best ask price: {ob.BestAskPrice} " +
$"Best ask qty: {ob.BestAskQty} " +
$"Best bid price: {ob.BestBidPrice} " +
$"Best bid qty: {ob.BestBidQty}");
});
client.Streams.KlineStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Kline [{ob.Symbol}] " +
$"Kline start time: {ob.Data.StartTime} " +
$"Kline close time: {ob.Data.CloseTime} " +
$"Interval: {ob.Data.Interval} " +
$"First trade ID: {ob.Data.FirstTradeId} " +
$"Last trade ID: {ob.Data.LastTradeId} " +
$"Open price: {ob.Data.OpenPrice} " +
$"Close price: {ob.Data.ClosePrice} " +
$"High price: {ob.Data.HighPrice} " +
$"Low price: {ob.Data.LowPrice} " +
$"Base asset volume: {ob.Data.BaseAssetVolume} " +
$"Number of trades: {ob.Data.NumberTrades} " +
$"Is this kline closed?: {ob.Data.IsClose} " +
$"Quote asset volume: {ob.Data.QuoteAssetVolume} " +
$"Taker buy base: {ob.Data.TakerBuyBaseAssetVolume} " +
$"Taker buy quote: {ob.Data.TakerBuyQuoteAssetVolume} " +
$"Ignore: {ob.Data.Ignore} ");
});
client.Streams.MiniTickerStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Mini-ticker [{ob.Symbol}] " +
$"Open price: {ob.OpenPrice} " +
$"Close price: {ob.ClosePrice} " +
$"High price: {ob.HighPrice} " +
$"Low price: {ob.LowPrice} " +
$"Base asset volume: {ob.BaseAssetVolume} " +
$"Quote asset volume: {ob.QuoteAssetVolume}");
});
以下是这些订阅的实际情况。
public class BinanceClientStreams
{
internal readonly Subject<PongResponse> PongSubject = new Subject<PongResponse>();
internal readonly Subject<TradeResponse> TradesSubject = new Subject<TradeResponse>();
internal readonly Subject<AggregatedTradeResponse> TradeBinSubject = new Subject<AggregatedTradeResponse>();
internal readonly Subject<OrderBookPartialResponse> OrderBookPartialSubject =
new Subject<OrderBookPartialResponse>();
internal readonly Subject<OrderBookDiffResponse> OrderBookDiffSubject = new Subject<OrderBookDiffResponse>();
internal readonly Subject<FundingResponse> FundingSubject = new Subject<FundingResponse>();
internal readonly Subject<BookTickerResponse> BookTickerSubject = new Subject<BookTickerResponse>();
internal readonly Subject<KlineResponse> KlineSubject = new Subject<KlineResponse>();
internal readonly Subject<MiniTickerResponse> MiniTickerSubject = new Subject<MiniTickerResponse>();
// PUBLIC
/// <summary>
/// Response stream to every ping request
/// </summary>
public IObservable<PongResponse> PongStream => PongSubject.AsObservable();
/// <summary>
/// Trades stream - emits every executed trade on Binance
/// </summary>
public IObservable<TradeResponse> TradesStream => TradesSubject.AsObservable();
/// <summary>
/// Chunk of trades - emits grouped trades together
/// </summary>
public IObservable<AggregatedTradeResponse> AggregateTradesStream => TradeBinSubject.AsObservable();
/// <summary>
/// Partial order book stream - emits small snapshot of the order book
/// </summary>
public IObservable<OrderBookPartialResponse> OrderBookPartialStream => OrderBookPartialSubject.AsObservable();
/// <summary>
/// Order book difference stream - emits small snapshot of the order book
/// </summary>
public IObservable<OrderBookDiffResponse> OrderBookDiffStream => OrderBookDiffSubject.AsObservable();
/// <summary>
/// Mark price and funding rate stream - emits mark price and funding rate for a single symbol pushed every 3 seconds or every second
/// </summary>
public IObservable<FundingResponse> FundingStream => FundingSubject.AsObservable();
/// <summary>
/// The best bid or ask's price or quantity in real-time for a specified symbol
/// </summary>
public IObservable<BookTickerResponse> BookTickerStream => BookTickerSubject.AsObservable();
/// <summary>
/// The Kline/Candlestick subscription, provide symbol and chart intervals
/// </summary>
public IObservable<KlineResponse> KlineStream => KlineSubject.AsObservable();
/// <summary>
/// Mini-ticker specified symbol statistics for the previous 24hrs
/// </summary>
public IObservable<MiniTickerResponse> MiniTickerStream => MiniTickerSubject.AsObservable();
}
我想你要找的是 CompositeDisposable
。您需要创建该实例 class 并将您的所有订阅添加到其中。
var compDisp = new CompositeDisposable();
compDisp.Add(client.Streams.PongStream.Subscribe(x =>
Log.Information($"Pong received ({x.Message})")));
compDisp.Add(client.Streams.FundingStream.Subscribe(response =>
{
var funding = response.Data;
Log.Information($"Funding: [{funding.Symbol}] rate:[{funding.FundingRate}] " +
$"mark price: {funding.MarkPrice} next funding: {funding.NextFundingTime} " +
$"index price {funding.IndexPrice}");
}));
compDisp.Add(client.Streams.AggregateTradesStream.Subscribe(response =>
{
var trade = response.Data;
Log.Information($"Trade aggreg [{trade.Symbol}] [{trade.Side}] " +
$"price: {trade.Price} size: {trade.Quantity}");
}));
compDisp.Add(client.Streams.TradesStream.Subscribe(response =>
{
var trade = response.Data;
Log.Information($"Trade normal [{trade.Symbol}] [{trade.Side}] " +
$"price: {trade.Price} size: {trade.Quantity}");
}));
compDisp.Add(client.Streams.OrderBookPartialStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Order book snapshot [{ob.Symbol}] " +
$"bid: {ob.Bids.FirstOrDefault()?.Price:F} " +
$"ask: {ob.Asks.FirstOrDefault()?.Price:F}");
Task.Delay(500).Wait();
//OrderBookPartialResponse.StreamFakeSnapshot(response.Data, comm);
}));
compDisp.Add(client.Streams.OrderBookDiffStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Order book diff [{ob.Symbol}] " +
$"bid: {ob.Bids.FirstOrDefault()?.Price:F} " +
$"ask: {ob.Asks.FirstOrDefault()?.Price:F}");
}));
compDisp.Add(client.Streams.BookTickerStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Book ticker [{ob.Symbol}] " +
$"Best ask price: {ob.BestAskPrice} " +
$"Best ask qty: {ob.BestAskQty} " +
$"Best bid price: {ob.BestBidPrice} " +
$"Best bid qty: {ob.BestBidQty}");
}));
compDisp.Add(client.Streams.KlineStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Kline [{ob.Symbol}] " +
$"Kline start time: {ob.Data.StartTime} " +
$"Kline close time: {ob.Data.CloseTime} " +
$"Interval: {ob.Data.Interval} " +
$"First trade ID: {ob.Data.FirstTradeId} " +
$"Last trade ID: {ob.Data.LastTradeId} " +
$"Open price: {ob.Data.OpenPrice} " +
$"Close price: {ob.Data.ClosePrice} " +
$"High price: {ob.Data.HighPrice} " +
$"Low price: {ob.Data.LowPrice} " +
$"Base asset volume: {ob.Data.BaseAssetVolume} " +
$"Number of trades: {ob.Data.NumberTrades} " +
$"Is this kline closed?: {ob.Data.IsClose} " +
$"Quote asset volume: {ob.Data.QuoteAssetVolume} " +
$"Taker buy base: {ob.Data.TakerBuyBaseAssetVolume} " +
$"Taker buy quote: {ob.Data.TakerBuyQuoteAssetVolume} " +
$"Ignore: {ob.Data.Ignore} ");
}));
compDisp.Add(client.Streams.MiniTickerStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Mini-ticker [{ob.Symbol}] " +
$"Open price: {ob.OpenPrice} " +
$"Close price: {ob.ClosePrice} " +
$"High price: {ob.HighPrice} " +
$"Low price: {ob.LowPrice} " +
$"Base asset volume: {ob.BaseAssetVolume} " +
$"Quote asset volume: {ob.QuoteAssetVolume}");
}));
一旦 compDisp
实例被处置,所有订阅将被处置。当然,何时完成取决于您应用程序的上下文。
编辑:
根据您的应用程序架构,WhenActivated
扩展方法也可能对您感兴趣。它在 ActivatableView
和 ActivatableViewModel
界面上定义,并接受每次视图(模型)被激活时调用的函数(即基本上当它显示在屏幕上时)。此函数还有一个 CompositeDisposable
作为参数,每次视图(模型)被停用时都会配置该参数。
编辑 2
刚刚意识到 DiposeWith
方法实际上是 ReactiveUI
框架和 WhenAcitvated
扩展方法的一部分,而不是该框架所基于的反应式扩展的一部分。所以你不能不使用那个框架就写出像 myObservable.Subscribe(x => ...).DisposeWith(compDisp)
这样的东西,但是 compDisp.Add(myObservable.Subscribe(x => ...))
应该可以。我对上面的代码做了相应的调整。
我有数量未知的订阅要一次性处理掉,因为它们可能会变得很多。有没有一种机制可以使用 System.Reactive
一次性处理掉它们?也许,将它们包装成 Observable.Using(() => Disposable.Create...
会起作用吗?
client.Streams.PongStream.Subscribe(x =>
Log.Information($"Pong received ({x.Message})"));
client.Streams.FundingStream.Subscribe(response =>
{
var funding = response.Data;
Log.Information($"Funding: [{funding.Symbol}] rate:[{funding.FundingRate}] " +
$"mark price: {funding.MarkPrice} next funding: {funding.NextFundingTime} " +
$"index price {funding.IndexPrice}");
});
client.Streams.AggregateTradesStream.Subscribe(response =>
{
var trade = response.Data;
Log.Information($"Trade aggreg [{trade.Symbol}] [{trade.Side}] " +
$"price: {trade.Price} size: {trade.Quantity}");
});
client.Streams.TradesStream.Subscribe(response =>
{
var trade = response.Data;
Log.Information($"Trade normal [{trade.Symbol}] [{trade.Side}] " +
$"price: {trade.Price} size: {trade.Quantity}");
});
client.Streams.OrderBookPartialStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Order book snapshot [{ob.Symbol}] " +
$"bid: {ob.Bids.FirstOrDefault()?.Price:F} " +
$"ask: {ob.Asks.FirstOrDefault()?.Price:F}");
Task.Delay(500).Wait();
//OrderBookPartialResponse.StreamFakeSnapshot(response.Data, comm);
});
client.Streams.OrderBookDiffStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Order book diff [{ob.Symbol}] " +
$"bid: {ob.Bids.FirstOrDefault()?.Price:F} " +
$"ask: {ob.Asks.FirstOrDefault()?.Price:F}");
});
client.Streams.BookTickerStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Book ticker [{ob.Symbol}] " +
$"Best ask price: {ob.BestAskPrice} " +
$"Best ask qty: {ob.BestAskQty} " +
$"Best bid price: {ob.BestBidPrice} " +
$"Best bid qty: {ob.BestBidQty}");
});
client.Streams.KlineStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Kline [{ob.Symbol}] " +
$"Kline start time: {ob.Data.StartTime} " +
$"Kline close time: {ob.Data.CloseTime} " +
$"Interval: {ob.Data.Interval} " +
$"First trade ID: {ob.Data.FirstTradeId} " +
$"Last trade ID: {ob.Data.LastTradeId} " +
$"Open price: {ob.Data.OpenPrice} " +
$"Close price: {ob.Data.ClosePrice} " +
$"High price: {ob.Data.HighPrice} " +
$"Low price: {ob.Data.LowPrice} " +
$"Base asset volume: {ob.Data.BaseAssetVolume} " +
$"Number of trades: {ob.Data.NumberTrades} " +
$"Is this kline closed?: {ob.Data.IsClose} " +
$"Quote asset volume: {ob.Data.QuoteAssetVolume} " +
$"Taker buy base: {ob.Data.TakerBuyBaseAssetVolume} " +
$"Taker buy quote: {ob.Data.TakerBuyQuoteAssetVolume} " +
$"Ignore: {ob.Data.Ignore} ");
});
client.Streams.MiniTickerStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Mini-ticker [{ob.Symbol}] " +
$"Open price: {ob.OpenPrice} " +
$"Close price: {ob.ClosePrice} " +
$"High price: {ob.HighPrice} " +
$"Low price: {ob.LowPrice} " +
$"Base asset volume: {ob.BaseAssetVolume} " +
$"Quote asset volume: {ob.QuoteAssetVolume}");
});
以下是这些订阅的实际情况。
public class BinanceClientStreams
{
internal readonly Subject<PongResponse> PongSubject = new Subject<PongResponse>();
internal readonly Subject<TradeResponse> TradesSubject = new Subject<TradeResponse>();
internal readonly Subject<AggregatedTradeResponse> TradeBinSubject = new Subject<AggregatedTradeResponse>();
internal readonly Subject<OrderBookPartialResponse> OrderBookPartialSubject =
new Subject<OrderBookPartialResponse>();
internal readonly Subject<OrderBookDiffResponse> OrderBookDiffSubject = new Subject<OrderBookDiffResponse>();
internal readonly Subject<FundingResponse> FundingSubject = new Subject<FundingResponse>();
internal readonly Subject<BookTickerResponse> BookTickerSubject = new Subject<BookTickerResponse>();
internal readonly Subject<KlineResponse> KlineSubject = new Subject<KlineResponse>();
internal readonly Subject<MiniTickerResponse> MiniTickerSubject = new Subject<MiniTickerResponse>();
// PUBLIC
/// <summary>
/// Response stream to every ping request
/// </summary>
public IObservable<PongResponse> PongStream => PongSubject.AsObservable();
/// <summary>
/// Trades stream - emits every executed trade on Binance
/// </summary>
public IObservable<TradeResponse> TradesStream => TradesSubject.AsObservable();
/// <summary>
/// Chunk of trades - emits grouped trades together
/// </summary>
public IObservable<AggregatedTradeResponse> AggregateTradesStream => TradeBinSubject.AsObservable();
/// <summary>
/// Partial order book stream - emits small snapshot of the order book
/// </summary>
public IObservable<OrderBookPartialResponse> OrderBookPartialStream => OrderBookPartialSubject.AsObservable();
/// <summary>
/// Order book difference stream - emits small snapshot of the order book
/// </summary>
public IObservable<OrderBookDiffResponse> OrderBookDiffStream => OrderBookDiffSubject.AsObservable();
/// <summary>
/// Mark price and funding rate stream - emits mark price and funding rate for a single symbol pushed every 3 seconds or every second
/// </summary>
public IObservable<FundingResponse> FundingStream => FundingSubject.AsObservable();
/// <summary>
/// The best bid or ask's price or quantity in real-time for a specified symbol
/// </summary>
public IObservable<BookTickerResponse> BookTickerStream => BookTickerSubject.AsObservable();
/// <summary>
/// The Kline/Candlestick subscription, provide symbol and chart intervals
/// </summary>
public IObservable<KlineResponse> KlineStream => KlineSubject.AsObservable();
/// <summary>
/// Mini-ticker specified symbol statistics for the previous 24hrs
/// </summary>
public IObservable<MiniTickerResponse> MiniTickerStream => MiniTickerSubject.AsObservable();
}
我想你要找的是 CompositeDisposable
。您需要创建该实例 class 并将您的所有订阅添加到其中。
var compDisp = new CompositeDisposable();
compDisp.Add(client.Streams.PongStream.Subscribe(x =>
Log.Information($"Pong received ({x.Message})")));
compDisp.Add(client.Streams.FundingStream.Subscribe(response =>
{
var funding = response.Data;
Log.Information($"Funding: [{funding.Symbol}] rate:[{funding.FundingRate}] " +
$"mark price: {funding.MarkPrice} next funding: {funding.NextFundingTime} " +
$"index price {funding.IndexPrice}");
}));
compDisp.Add(client.Streams.AggregateTradesStream.Subscribe(response =>
{
var trade = response.Data;
Log.Information($"Trade aggreg [{trade.Symbol}] [{trade.Side}] " +
$"price: {trade.Price} size: {trade.Quantity}");
}));
compDisp.Add(client.Streams.TradesStream.Subscribe(response =>
{
var trade = response.Data;
Log.Information($"Trade normal [{trade.Symbol}] [{trade.Side}] " +
$"price: {trade.Price} size: {trade.Quantity}");
}));
compDisp.Add(client.Streams.OrderBookPartialStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Order book snapshot [{ob.Symbol}] " +
$"bid: {ob.Bids.FirstOrDefault()?.Price:F} " +
$"ask: {ob.Asks.FirstOrDefault()?.Price:F}");
Task.Delay(500).Wait();
//OrderBookPartialResponse.StreamFakeSnapshot(response.Data, comm);
}));
compDisp.Add(client.Streams.OrderBookDiffStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Order book diff [{ob.Symbol}] " +
$"bid: {ob.Bids.FirstOrDefault()?.Price:F} " +
$"ask: {ob.Asks.FirstOrDefault()?.Price:F}");
}));
compDisp.Add(client.Streams.BookTickerStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Book ticker [{ob.Symbol}] " +
$"Best ask price: {ob.BestAskPrice} " +
$"Best ask qty: {ob.BestAskQty} " +
$"Best bid price: {ob.BestBidPrice} " +
$"Best bid qty: {ob.BestBidQty}");
}));
compDisp.Add(client.Streams.KlineStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Kline [{ob.Symbol}] " +
$"Kline start time: {ob.Data.StartTime} " +
$"Kline close time: {ob.Data.CloseTime} " +
$"Interval: {ob.Data.Interval} " +
$"First trade ID: {ob.Data.FirstTradeId} " +
$"Last trade ID: {ob.Data.LastTradeId} " +
$"Open price: {ob.Data.OpenPrice} " +
$"Close price: {ob.Data.ClosePrice} " +
$"High price: {ob.Data.HighPrice} " +
$"Low price: {ob.Data.LowPrice} " +
$"Base asset volume: {ob.Data.BaseAssetVolume} " +
$"Number of trades: {ob.Data.NumberTrades} " +
$"Is this kline closed?: {ob.Data.IsClose} " +
$"Quote asset volume: {ob.Data.QuoteAssetVolume} " +
$"Taker buy base: {ob.Data.TakerBuyBaseAssetVolume} " +
$"Taker buy quote: {ob.Data.TakerBuyQuoteAssetVolume} " +
$"Ignore: {ob.Data.Ignore} ");
}));
compDisp.Add(client.Streams.MiniTickerStream.Subscribe(response =>
{
var ob = response.Data;
Log.Information($"Mini-ticker [{ob.Symbol}] " +
$"Open price: {ob.OpenPrice} " +
$"Close price: {ob.ClosePrice} " +
$"High price: {ob.HighPrice} " +
$"Low price: {ob.LowPrice} " +
$"Base asset volume: {ob.BaseAssetVolume} " +
$"Quote asset volume: {ob.QuoteAssetVolume}");
}));
一旦 compDisp
实例被处置,所有订阅将被处置。当然,何时完成取决于您应用程序的上下文。
编辑:
根据您的应用程序架构,WhenActivated
扩展方法也可能对您感兴趣。它在 ActivatableView
和 ActivatableViewModel
界面上定义,并接受每次视图(模型)被激活时调用的函数(即基本上当它显示在屏幕上时)。此函数还有一个 CompositeDisposable
作为参数,每次视图(模型)被停用时都会配置该参数。
编辑 2
刚刚意识到 DiposeWith
方法实际上是 ReactiveUI
框架和 WhenAcitvated
扩展方法的一部分,而不是该框架所基于的反应式扩展的一部分。所以你不能不使用那个框架就写出像 myObservable.Subscribe(x => ...).DisposeWith(compDisp)
这样的东西,但是 compDisp.Add(myObservable.Subscribe(x => ...))
应该可以。我对上面的代码做了相应的调整。