System.Reactive - 一次处理未知数量的订阅

System.Reactive - Disposing unknown amount of subscriptions at once

我有数量未知的订阅要一次性处理掉,因为它们可能会变得很多。有没有一种机制可以使用 System.Reactive 一次性处理掉它们?也许,将它们包装成 Observable.Using(() => Disposable.Create... 会起作用吗?

client.Streams.PongStream.Subscribe(x =>
    Log.Information($"Pong received ({x.Message})"));

client.Streams.FundingStream.Subscribe(response =>
{
    var funding = response.Data;
    Log.Information($"Funding: [{funding.Symbol}] rate:[{funding.FundingRate}] " +
                    $"mark price: {funding.MarkPrice} next funding: {funding.NextFundingTime} " +
                    $"index price {funding.IndexPrice}");
});

client.Streams.AggregateTradesStream.Subscribe(response =>
{
    var trade = response.Data;
    Log.Information($"Trade aggreg [{trade.Symbol}] [{trade.Side}] " +
                    $"price: {trade.Price} size: {trade.Quantity}");
});

client.Streams.TradesStream.Subscribe(response =>
{
    var trade = response.Data;
    Log.Information($"Trade normal [{trade.Symbol}] [{trade.Side}] " +
                    $"price: {trade.Price} size: {trade.Quantity}");
});

client.Streams.OrderBookPartialStream.Subscribe(response =>
{
    var ob = response.Data;
    Log.Information($"Order book snapshot [{ob.Symbol}] " +
                    $"bid: {ob.Bids.FirstOrDefault()?.Price:F} " +
                    $"ask: {ob.Asks.FirstOrDefault()?.Price:F}");
    Task.Delay(500).Wait();
    //OrderBookPartialResponse.StreamFakeSnapshot(response.Data, comm);
});

client.Streams.OrderBookDiffStream.Subscribe(response =>
{
    var ob = response.Data;
    Log.Information($"Order book diff [{ob.Symbol}] " +
                    $"bid: {ob.Bids.FirstOrDefault()?.Price:F} " +
                    $"ask: {ob.Asks.FirstOrDefault()?.Price:F}");
});

client.Streams.BookTickerStream.Subscribe(response =>
{
    var ob = response.Data;
    Log.Information($"Book ticker [{ob.Symbol}] " +
                    $"Best ask price: {ob.BestAskPrice} " +
                    $"Best ask qty: {ob.BestAskQty} " +
                    $"Best bid price: {ob.BestBidPrice} " +
                    $"Best bid qty: {ob.BestBidQty}");
});

client.Streams.KlineStream.Subscribe(response =>
{
    var ob = response.Data;
    Log.Information($"Kline [{ob.Symbol}] " +
                    $"Kline start time: {ob.Data.StartTime} " +
                    $"Kline close time: {ob.Data.CloseTime} " +
                    $"Interval: {ob.Data.Interval} " +
                    $"First trade ID: {ob.Data.FirstTradeId} " +
                    $"Last trade ID: {ob.Data.LastTradeId} " +
                    $"Open price: {ob.Data.OpenPrice} " +
                    $"Close price: {ob.Data.ClosePrice} " +
                    $"High price: {ob.Data.HighPrice} " +
                    $"Low price: {ob.Data.LowPrice} " +
                    $"Base asset volume: {ob.Data.BaseAssetVolume} " +
                    $"Number of trades: {ob.Data.NumberTrades} " +
                    $"Is this kline closed?: {ob.Data.IsClose} " +
                    $"Quote asset volume: {ob.Data.QuoteAssetVolume} " +
                    $"Taker buy base: {ob.Data.TakerBuyBaseAssetVolume} " +
                    $"Taker buy quote: {ob.Data.TakerBuyQuoteAssetVolume} " +
                    $"Ignore: {ob.Data.Ignore} ");
});

client.Streams.MiniTickerStream.Subscribe(response =>
{
    var ob = response.Data;
    Log.Information($"Mini-ticker [{ob.Symbol}] " +
                    $"Open price: {ob.OpenPrice} " +
                    $"Close price: {ob.ClosePrice} " +
                    $"High price: {ob.HighPrice} " +
                    $"Low price: {ob.LowPrice} " +
                    $"Base asset volume: {ob.BaseAssetVolume} " +
                    $"Quote asset volume: {ob.QuoteAssetVolume}");
});

以下是这些订阅的实际情况。

public class BinanceClientStreams
{
    internal readonly Subject<PongResponse> PongSubject = new Subject<PongResponse>();

    internal readonly Subject<TradeResponse> TradesSubject = new Subject<TradeResponse>();
    internal readonly Subject<AggregatedTradeResponse> TradeBinSubject = new Subject<AggregatedTradeResponse>();

    internal readonly Subject<OrderBookPartialResponse> OrderBookPartialSubject =
        new Subject<OrderBookPartialResponse>();

    internal readonly Subject<OrderBookDiffResponse> OrderBookDiffSubject = new Subject<OrderBookDiffResponse>();
    internal readonly Subject<FundingResponse> FundingSubject = new Subject<FundingResponse>();

    internal readonly Subject<BookTickerResponse> BookTickerSubject = new Subject<BookTickerResponse>();
    
    internal readonly Subject<KlineResponse> KlineSubject = new Subject<KlineResponse>();
    
    internal readonly Subject<MiniTickerResponse> MiniTickerSubject = new Subject<MiniTickerResponse>();
    
    // PUBLIC

    /// <summary>
    /// Response stream to every ping request
    /// </summary>
    public IObservable<PongResponse> PongStream => PongSubject.AsObservable();

    /// <summary>
    /// Trades stream - emits every executed trade on Binance
    /// </summary>
    public IObservable<TradeResponse> TradesStream => TradesSubject.AsObservable();

    /// <summary>
    /// Chunk of trades - emits grouped trades together
    /// </summary>
    public IObservable<AggregatedTradeResponse> AggregateTradesStream => TradeBinSubject.AsObservable();

    /// <summary>
    /// Partial order book stream - emits small snapshot of the order book
    /// </summary>
    public IObservable<OrderBookPartialResponse> OrderBookPartialStream => OrderBookPartialSubject.AsObservable();

    /// <summary>
    /// Order book difference stream - emits small snapshot of the order book
    /// </summary>
    public IObservable<OrderBookDiffResponse> OrderBookDiffStream => OrderBookDiffSubject.AsObservable();

    /// <summary>
    /// Mark price and funding rate stream - emits mark price and funding rate for a single symbol pushed every 3 seconds or every second
    /// </summary>
    public IObservable<FundingResponse> FundingStream => FundingSubject.AsObservable();

    /// <summary>
    ///  The best bid or ask's price or quantity in real-time for a specified symbol
    /// </summary>
    public IObservable<BookTickerResponse> BookTickerStream => BookTickerSubject.AsObservable();

    /// <summary>
    /// The Kline/Candlestick subscription, provide symbol and chart intervals
    /// </summary>
    public IObservable<KlineResponse> KlineStream => KlineSubject.AsObservable();

    /// <summary>
    /// Mini-ticker specified symbol statistics for the previous 24hrs
    /// </summary>
    public IObservable<MiniTickerResponse> MiniTickerStream => MiniTickerSubject.AsObservable();
}

我想你要找的是 CompositeDisposable。您需要创建该实例 class 并将您的所有订阅添加到其中。

var compDisp = new CompositeDisposable();

compDisp.Add(client.Streams.PongStream.Subscribe(x =>
    Log.Information($"Pong received ({x.Message})")));

compDisp.Add(client.Streams.FundingStream.Subscribe(response =>
{
    var funding = response.Data;
    Log.Information($"Funding: [{funding.Symbol}] rate:[{funding.FundingRate}] " +
                    $"mark price: {funding.MarkPrice} next funding: {funding.NextFundingTime} " +
                    $"index price {funding.IndexPrice}");
}));

compDisp.Add(client.Streams.AggregateTradesStream.Subscribe(response =>
{
    var trade = response.Data;
    Log.Information($"Trade aggreg [{trade.Symbol}] [{trade.Side}] " +
                    $"price: {trade.Price} size: {trade.Quantity}");
}));

compDisp.Add(client.Streams.TradesStream.Subscribe(response =>
{
    var trade = response.Data;
    Log.Information($"Trade normal [{trade.Symbol}] [{trade.Side}] " +
                    $"price: {trade.Price} size: {trade.Quantity}");
}));

compDisp.Add(client.Streams.OrderBookPartialStream.Subscribe(response =>
{
    var ob = response.Data;
    Log.Information($"Order book snapshot [{ob.Symbol}] " +
                    $"bid: {ob.Bids.FirstOrDefault()?.Price:F} " +
                    $"ask: {ob.Asks.FirstOrDefault()?.Price:F}");
    Task.Delay(500).Wait();
    //OrderBookPartialResponse.StreamFakeSnapshot(response.Data, comm);
}));

compDisp.Add(client.Streams.OrderBookDiffStream.Subscribe(response =>
{
    var ob = response.Data;
    Log.Information($"Order book diff [{ob.Symbol}] " +
                    $"bid: {ob.Bids.FirstOrDefault()?.Price:F} " +
                    $"ask: {ob.Asks.FirstOrDefault()?.Price:F}");
}));

compDisp.Add(client.Streams.BookTickerStream.Subscribe(response =>
{
    var ob = response.Data;
    Log.Information($"Book ticker [{ob.Symbol}] " +
                    $"Best ask price: {ob.BestAskPrice} " +
                    $"Best ask qty: {ob.BestAskQty} " +
                    $"Best bid price: {ob.BestBidPrice} " +
                    $"Best bid qty: {ob.BestBidQty}");
}));

compDisp.Add(client.Streams.KlineStream.Subscribe(response =>
{
    var ob = response.Data;
    Log.Information($"Kline [{ob.Symbol}] " +
                    $"Kline start time: {ob.Data.StartTime} " +
                    $"Kline close time: {ob.Data.CloseTime} " +
                    $"Interval: {ob.Data.Interval} " +
                    $"First trade ID: {ob.Data.FirstTradeId} " +
                    $"Last trade ID: {ob.Data.LastTradeId} " +
                    $"Open price: {ob.Data.OpenPrice} " +
                    $"Close price: {ob.Data.ClosePrice} " +
                    $"High price: {ob.Data.HighPrice} " +
                    $"Low price: {ob.Data.LowPrice} " +
                    $"Base asset volume: {ob.Data.BaseAssetVolume} " +
                    $"Number of trades: {ob.Data.NumberTrades} " +
                    $"Is this kline closed?: {ob.Data.IsClose} " +
                    $"Quote asset volume: {ob.Data.QuoteAssetVolume} " +
                    $"Taker buy base: {ob.Data.TakerBuyBaseAssetVolume} " +
                    $"Taker buy quote: {ob.Data.TakerBuyQuoteAssetVolume} " +
                    $"Ignore: {ob.Data.Ignore} ");
}));

compDisp.Add(client.Streams.MiniTickerStream.Subscribe(response =>
{
    var ob = response.Data;
    Log.Information($"Mini-ticker [{ob.Symbol}] " +
                    $"Open price: {ob.OpenPrice} " +
                    $"Close price: {ob.ClosePrice} " +
                    $"High price: {ob.HighPrice} " +
                    $"Low price: {ob.LowPrice} " +
                    $"Base asset volume: {ob.BaseAssetVolume} " +
                    $"Quote asset volume: {ob.QuoteAssetVolume}");
}));

一旦 compDisp 实例被处置,所有订阅将被处置。当然,何时完成取决于您应用程序的上下文。

编辑: 根据您的应用程序架构,WhenActivated 扩展方法也可能对您感兴趣。它在 ActivatableViewActivatableViewModel 界面上定义,并接受每次视图(模型)被激活时调用的函数(即基本上当它显示在屏幕上时)。此函数还有一个 CompositeDisposable 作为参数,每次视图(模型)被停用时都会配置该参数。

编辑 2 刚刚意识到 DiposeWith 方法实际上是 ReactiveUI 框架和 WhenAcitvated 扩展方法的一部分,而不是该框架所基于的反应式扩展的一部分。所以你不能不使用那个框架就写出像 myObservable.Subscribe(x => ...).DisposeWith(compDisp) 这样的东西,但是 compDisp.Add(myObservable.Subscribe(x => ...)) 应该可以。我对上面的代码做了相应的调整。