从 rx.net Observable.FromAsync 捕获异常

Catch exception from rx.net Observable.FromAsync

我有以下 Reactive Extensions Subject 并且需要记录异常,还需要彻底关闭在 Observable.FromAsync 上创建的这个可能阻塞的异步任务。

关于这个和取消令牌,异常 TaskCanceledException 将被等待令牌的任何东西抛出。

我如何捕获这些异常 - 忽略关机时预期的 TaskCanceledException,并记录其余的?

internal sealed class TradeAggregator : IDisposable
{
    private readonly Subject<TradeExecuted> feed = new();
    private readonly CancellationTokenSource cts = new();
    private bool isStopped;
    private bool isDisposed;

    public TradeAggregator(Func<TradeAggregate, CancellationToken, Task> dispatch)
    {
        feed
            .GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(TimeSpan.FromSeconds(5)))
            .SelectMany(x => x.ToList())
            .Select(trades => Observable.FromAsync(() => dispatch(AggregateTrades(trades), cts.Token)))
            .Concat() // Ensure that the results are serialized.
            .Subscribe(cts.Token); // Check status of calls.
    }

    private TradeAggregate AggregateTrades(IEnumerable<TradeExecuted> trades)
    {
        // Do stuff.
        return new TradeAggregate();
    }

    public void OnNext(ExecutedTrade trade) => this.feed.OnNext(trade);

    public void Stop()
    {
        if (isStopped) return;
        isStopped = true;
        cts.Cancel();
    }

    public void Dispose()
    {
        if (isDisposed) return;
        isDisposed = true;
        
        Stop();
        feed.Dispose();
        cts.Dispose();
    }
}

使用不同的 Subscribe 重载:

    feed
        .GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(TimeSpan.FromSeconds(5)))
        .SelectMany(x => x.ToList())
        .Select(trades => Observable.FromAsync(() => dispatch(AggregateTrades(trades), cts.Token)))
        .Concat() // Ensure that the results are serialized.
        .Subscribe(
          x => {}, //OnNext, do nothing
          e => {  //OnError, handle
            if(e.GetType() == typeof(TaskCanceledException))
              ; //ignore
            else
            {
              ; //log
            }
          },
          cts.Token
        );