Enumerable.Range / Observable.FromAsync 上的 Reactive Extensions OperationCancelled 异常

Reactive Extensions OperationCancelled exception on Enumerable.Range / Observable.FromAsync

我有以下代码,它从分页的 REST 中提取数据 API。

当使用响应式扩展时,它接近下载结束(即已知目标 1,653 页中的第 1,636 页,它到达的确切计数取决于并发提取,并发性更高导致页面计数更少已知目标),然后我的接收函数抛出一个 OperationCancelled 异常(但我从未设置我的取消令牌源)。

这就像 Select 正在以某种方式取消我的功能,但只是在分页请求快结束时或 observable 终止然后杀死我的 observable,我认为(但对 rx.net 来说是新的) .

这也不是速率限制问题,一次下载一个(MaxConcurrentDownloads 设置为 1)。

任何想法我做错了什么请在下面?

using var httpClient = new HttpClient();
var api = new PolygonWebApi(httpClient, this.apiKey);
var list = new List<TickerV2>();
var start = DateTime.Now;

// get first page
var response = await api.GetTickersAsync(BatchSize, 1, this.cts.Token);
list.AddRange(response.Tickers);
var pages = (response.Count + BatchSize - 1) / BatchSize;
                
var query = Enumerable
                .Range(2, pages - 1)
                .ToObservable()
                .Select(page => Observable.FromAsync(() =>
                {
                    return api
                            .GetTickersAsync(BatchSize, page, this.cts.Token)
                            .ContinueWith( x => new TickersResponseWithPage(page, x.Result));
                }))
                .Merge(MaxConcurrentDownloads);

query.Subscribe((response) =>
{
    this.logger.LogInformation($"adding {response.TickersResponse.Tickers.Length} records from page {response.Page}");
    list.AddRange(response.TickersResponse.Tickers);
});
await query.ToTask(this.cts.Token);
                
var duration = DateTime.Now - start;
this.logger.LogInformation($"{nameof(UpdateTickersWorker)} downloaded {list.Count:n0} in {duration.Humanize()}");   

如果需要其他信息,顺序测试证明对 API 的调用是正确的并且 return 所有 1,653 页

using var httpClient = new HttpClient();
var api = new PolygonWebApi(httpClient, this.apiKey);
var list = new List<TickerV2>();
var start = DateTime.Now;


// get first page
var response = await api.GetTickersAsync(BatchSize, 1, this.cts.Token);
list.AddRange(response.Tickers);
var pages = (response.Count + BatchSize - 1) / BatchSize;

// read from second page
for (var page = 2; page <= pages && this.cts.Token.IsCancellationRequested == false; page++)
{
    response = await api.GetTickersAsync(BatchSize, page, this.cts.Token);
    list.AddRange(response.Tickers);
    this.logger.LogInformation($"adding {response.Tickers.Length} records from page {page}");
}

var duration = DateTime.Now - start;
this.logger.LogInformation($"{nameof(UpdateTickersWorker)} downloaded {list.Count} in {duration.Humanize()}");

更新

我修改了以下内容以停止重复页面并且似乎已经解决了重复问题:

IObservable<IList<TickerV2>> query =
    Observable
        .Using(
            () => new HttpClient(),
            hc =>
                from first_response in Observable.FromAsync(ct => PolygonWebApi.GetTickersAsync(hc, this.apiKey, BatchSize, 1, ct))
                let pages = (first_response.Count + BatchSize - 1) / BatchSize
                from trwp in
                    Observable
                        .Range(2, pages - 1)
                        .Select(page =>
                            Observable
                                .FromAsync(ct => PolygonWebApi.GetTickersAsync(hc, this.apiKey, BatchSize, page, ct))
                                .Select(r => new TickersResponseWithPage(page, r)))
                        .Merge(MaxConcurrentDownloads)
                        .StartWith(new TickersResponseWithPage(1, first_response))
                from tv2 in trwp.TickersResponse.Tickers
                select tv2)
        .ToList();

list = await query.ToTask(this.cts.Token);

您将同步代码、可枚举、Rx 和任务进行了大量奇怪的混合。所有这些都会在调试时造成一团糟。您应该选择一个 monad 并一直留在其中 - 不要混合使用它们。

你能试试你的代码的这个纯 Rx 版本,让我知道你得到什么样的结果吗?请附加到问题的末尾,不要更改那里的内容。

IObservable<IList<TickerV2>> query =
    Observable
        .Using(
            () => new HttpClient(),
            hc =>
                from first_response in Observable.FromAsync(ct => api.GetTickersAsync(BatchSize, 1, ct))
                let pages = (first_response.Count + BatchSize - 1) / BatchSize
                from trwp in
                    Observable
                        .Range(2, pages - 1)
                        .SelectMany(page =>
                            Observable
                                .FromAsync(ct => api.GetTickersAsync(BatchSize, page, ct))
                                .Select(r => new TickersResponseWithPage(page, r)))
                        .StartWith(new TickersResponseWithPage(1, first_response))
                from tv2 in trwp.TickersResponse.Tickers
                select tv2)
        .ToList();

IList<TickerV2> list = await query;

以下是在 Defer 中创建 api 对象的方法:

IObservable<IList<TickerV2>> query =
    Observable
        .Defer(() =>
        {
            var api = new PolygonWebApi(httpClient, this.apiKey);
            return
                Observable
                    .Using(... as above ...)
                    .ToList();
        });