从 rx.net Observable.FromAsync 捕获异常
Catch exception from rx.net Observable.FromAsync
我有以下 Reactive Extensions Subject
并且需要记录异常,还需要彻底关闭在 Observable.FromAsync
上创建的这个可能阻塞的异步任务。
关于这个和取消令牌,异常 TaskCanceledException
将被等待令牌的任何东西抛出。
我如何捕获这些异常 - 忽略关机时预期的 TaskCanceledException
,并记录其余的?
internal sealed class TradeAggregator : IDisposable
{
private readonly Subject<TradeExecuted> feed = new();
private readonly CancellationTokenSource cts = new();
private bool isStopped;
private bool isDisposed;
public TradeAggregator(Func<TradeAggregate, CancellationToken, Task> dispatch)
{
feed
.GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(TimeSpan.FromSeconds(5)))
.SelectMany(x => x.ToList())
.Select(trades => Observable.FromAsync(() => dispatch(AggregateTrades(trades), cts.Token)))
.Concat() // Ensure that the results are serialized.
.Subscribe(cts.Token); // Check status of calls.
}
private TradeAggregate AggregateTrades(IEnumerable<TradeExecuted> trades)
{
// Do stuff.
return new TradeAggregate();
}
public void OnNext(ExecutedTrade trade) => this.feed.OnNext(trade);
public void Stop()
{
if (isStopped) return;
isStopped = true;
cts.Cancel();
}
public void Dispose()
{
if (isDisposed) return;
isDisposed = true;
Stop();
feed.Dispose();
cts.Dispose();
}
}
使用不同的 Subscribe
重载:
feed
.GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(TimeSpan.FromSeconds(5)))
.SelectMany(x => x.ToList())
.Select(trades => Observable.FromAsync(() => dispatch(AggregateTrades(trades), cts.Token)))
.Concat() // Ensure that the results are serialized.
.Subscribe(
x => {}, //OnNext, do nothing
e => { //OnError, handle
if(e.GetType() == typeof(TaskCanceledException))
; //ignore
else
{
; //log
}
},
cts.Token
);
我有以下 Reactive Extensions Subject
并且需要记录异常,还需要彻底关闭在 Observable.FromAsync
上创建的这个可能阻塞的异步任务。
关于这个和取消令牌,异常 TaskCanceledException
将被等待令牌的任何东西抛出。
我如何捕获这些异常 - 忽略关机时预期的 TaskCanceledException
,并记录其余的?
internal sealed class TradeAggregator : IDisposable
{
private readonly Subject<TradeExecuted> feed = new();
private readonly CancellationTokenSource cts = new();
private bool isStopped;
private bool isDisposed;
public TradeAggregator(Func<TradeAggregate, CancellationToken, Task> dispatch)
{
feed
.GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(TimeSpan.FromSeconds(5)))
.SelectMany(x => x.ToList())
.Select(trades => Observable.FromAsync(() => dispatch(AggregateTrades(trades), cts.Token)))
.Concat() // Ensure that the results are serialized.
.Subscribe(cts.Token); // Check status of calls.
}
private TradeAggregate AggregateTrades(IEnumerable<TradeExecuted> trades)
{
// Do stuff.
return new TradeAggregate();
}
public void OnNext(ExecutedTrade trade) => this.feed.OnNext(trade);
public void Stop()
{
if (isStopped) return;
isStopped = true;
cts.Cancel();
}
public void Dispose()
{
if (isDisposed) return;
isDisposed = true;
Stop();
feed.Dispose();
cts.Dispose();
}
}
使用不同的 Subscribe
重载:
feed
.GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(TimeSpan.FromSeconds(5)))
.SelectMany(x => x.ToList())
.Select(trades => Observable.FromAsync(() => dispatch(AggregateTrades(trades), cts.Token)))
.Concat() // Ensure that the results are serialized.
.Subscribe(
x => {}, //OnNext, do nothing
e => { //OnError, handle
if(e.GetType() == typeof(TaskCanceledException))
; //ignore
else
{
; //log
}
},
cts.Token
);