Enumerable.Range / Observable.FromAsync 上的 Reactive Extensions OperationCancelled 异常
Reactive Extensions OperationCancelled exception on Enumerable.Range / Observable.FromAsync
我有以下代码,它从分页的 REST 中提取数据 API。
当使用响应式扩展时,它接近下载结束(即已知目标 1,653 页中的第 1,636 页,它到达的确切计数取决于并发提取,并发性更高导致页面计数更少已知目标),然后我的接收函数抛出一个 OperationCancelled
异常(但我从未设置我的取消令牌源)。
这就像 Select
正在以某种方式取消我的功能,但只是在分页请求快结束时或 observable 终止然后杀死我的 observable,我认为(但对 rx.net 来说是新的) .
这也不是速率限制问题,一次下载一个(MaxConcurrentDownloads
设置为 1)。
任何想法我做错了什么请在下面?
using var httpClient = new HttpClient();
var api = new PolygonWebApi(httpClient, this.apiKey);
var list = new List<TickerV2>();
var start = DateTime.Now;
// get first page
var response = await api.GetTickersAsync(BatchSize, 1, this.cts.Token);
list.AddRange(response.Tickers);
var pages = (response.Count + BatchSize - 1) / BatchSize;
var query = Enumerable
.Range(2, pages - 1)
.ToObservable()
.Select(page => Observable.FromAsync(() =>
{
return api
.GetTickersAsync(BatchSize, page, this.cts.Token)
.ContinueWith( x => new TickersResponseWithPage(page, x.Result));
}))
.Merge(MaxConcurrentDownloads);
query.Subscribe((response) =>
{
this.logger.LogInformation($"adding {response.TickersResponse.Tickers.Length} records from page {response.Page}");
list.AddRange(response.TickersResponse.Tickers);
});
await query.ToTask(this.cts.Token);
var duration = DateTime.Now - start;
this.logger.LogInformation($"{nameof(UpdateTickersWorker)} downloaded {list.Count:n0} in {duration.Humanize()}");
如果需要其他信息,顺序测试证明对 API 的调用是正确的并且 return 所有 1,653 页
using var httpClient = new HttpClient();
var api = new PolygonWebApi(httpClient, this.apiKey);
var list = new List<TickerV2>();
var start = DateTime.Now;
// get first page
var response = await api.GetTickersAsync(BatchSize, 1, this.cts.Token);
list.AddRange(response.Tickers);
var pages = (response.Count + BatchSize - 1) / BatchSize;
// read from second page
for (var page = 2; page <= pages && this.cts.Token.IsCancellationRequested == false; page++)
{
response = await api.GetTickersAsync(BatchSize, page, this.cts.Token);
list.AddRange(response.Tickers);
this.logger.LogInformation($"adding {response.Tickers.Length} records from page {page}");
}
var duration = DateTime.Now - start;
this.logger.LogInformation($"{nameof(UpdateTickersWorker)} downloaded {list.Count} in {duration.Humanize()}");
更新
我修改了以下内容以停止重复页面并且似乎已经解决了重复问题:
IObservable<IList<TickerV2>> query =
Observable
.Using(
() => new HttpClient(),
hc =>
from first_response in Observable.FromAsync(ct => PolygonWebApi.GetTickersAsync(hc, this.apiKey, BatchSize, 1, ct))
let pages = (first_response.Count + BatchSize - 1) / BatchSize
from trwp in
Observable
.Range(2, pages - 1)
.Select(page =>
Observable
.FromAsync(ct => PolygonWebApi.GetTickersAsync(hc, this.apiKey, BatchSize, page, ct))
.Select(r => new TickersResponseWithPage(page, r)))
.Merge(MaxConcurrentDownloads)
.StartWith(new TickersResponseWithPage(1, first_response))
from tv2 in trwp.TickersResponse.Tickers
select tv2)
.ToList();
list = await query.ToTask(this.cts.Token);
您将同步代码、可枚举、Rx 和任务进行了大量奇怪的混合。所有这些都会在调试时造成一团糟。您应该选择一个 monad 并一直留在其中 - 不要混合使用它们。
你能试试你的代码的这个纯 Rx 版本,让我知道你得到什么样的结果吗?请附加到问题的末尾,不要更改那里的内容。
IObservable<IList<TickerV2>> query =
Observable
.Using(
() => new HttpClient(),
hc =>
from first_response in Observable.FromAsync(ct => api.GetTickersAsync(BatchSize, 1, ct))
let pages = (first_response.Count + BatchSize - 1) / BatchSize
from trwp in
Observable
.Range(2, pages - 1)
.SelectMany(page =>
Observable
.FromAsync(ct => api.GetTickersAsync(BatchSize, page, ct))
.Select(r => new TickersResponseWithPage(page, r)))
.StartWith(new TickersResponseWithPage(1, first_response))
from tv2 in trwp.TickersResponse.Tickers
select tv2)
.ToList();
IList<TickerV2> list = await query;
以下是在 Defer
中创建 api
对象的方法:
IObservable<IList<TickerV2>> query =
Observable
.Defer(() =>
{
var api = new PolygonWebApi(httpClient, this.apiKey);
return
Observable
.Using(... as above ...)
.ToList();
});
我有以下代码,它从分页的 REST 中提取数据 API。
当使用响应式扩展时,它接近下载结束(即已知目标 1,653 页中的第 1,636 页,它到达的确切计数取决于并发提取,并发性更高导致页面计数更少已知目标),然后我的接收函数抛出一个 OperationCancelled
异常(但我从未设置我的取消令牌源)。
这就像 Select
正在以某种方式取消我的功能,但只是在分页请求快结束时或 observable 终止然后杀死我的 observable,我认为(但对 rx.net 来说是新的) .
这也不是速率限制问题,一次下载一个(MaxConcurrentDownloads
设置为 1)。
任何想法我做错了什么请在下面?
using var httpClient = new HttpClient();
var api = new PolygonWebApi(httpClient, this.apiKey);
var list = new List<TickerV2>();
var start = DateTime.Now;
// get first page
var response = await api.GetTickersAsync(BatchSize, 1, this.cts.Token);
list.AddRange(response.Tickers);
var pages = (response.Count + BatchSize - 1) / BatchSize;
var query = Enumerable
.Range(2, pages - 1)
.ToObservable()
.Select(page => Observable.FromAsync(() =>
{
return api
.GetTickersAsync(BatchSize, page, this.cts.Token)
.ContinueWith( x => new TickersResponseWithPage(page, x.Result));
}))
.Merge(MaxConcurrentDownloads);
query.Subscribe((response) =>
{
this.logger.LogInformation($"adding {response.TickersResponse.Tickers.Length} records from page {response.Page}");
list.AddRange(response.TickersResponse.Tickers);
});
await query.ToTask(this.cts.Token);
var duration = DateTime.Now - start;
this.logger.LogInformation($"{nameof(UpdateTickersWorker)} downloaded {list.Count:n0} in {duration.Humanize()}");
如果需要其他信息,顺序测试证明对 API 的调用是正确的并且 return 所有 1,653 页
using var httpClient = new HttpClient();
var api = new PolygonWebApi(httpClient, this.apiKey);
var list = new List<TickerV2>();
var start = DateTime.Now;
// get first page
var response = await api.GetTickersAsync(BatchSize, 1, this.cts.Token);
list.AddRange(response.Tickers);
var pages = (response.Count + BatchSize - 1) / BatchSize;
// read from second page
for (var page = 2; page <= pages && this.cts.Token.IsCancellationRequested == false; page++)
{
response = await api.GetTickersAsync(BatchSize, page, this.cts.Token);
list.AddRange(response.Tickers);
this.logger.LogInformation($"adding {response.Tickers.Length} records from page {page}");
}
var duration = DateTime.Now - start;
this.logger.LogInformation($"{nameof(UpdateTickersWorker)} downloaded {list.Count} in {duration.Humanize()}");
更新
我修改了以下内容以停止重复页面并且似乎已经解决了重复问题:
IObservable<IList<TickerV2>> query =
Observable
.Using(
() => new HttpClient(),
hc =>
from first_response in Observable.FromAsync(ct => PolygonWebApi.GetTickersAsync(hc, this.apiKey, BatchSize, 1, ct))
let pages = (first_response.Count + BatchSize - 1) / BatchSize
from trwp in
Observable
.Range(2, pages - 1)
.Select(page =>
Observable
.FromAsync(ct => PolygonWebApi.GetTickersAsync(hc, this.apiKey, BatchSize, page, ct))
.Select(r => new TickersResponseWithPage(page, r)))
.Merge(MaxConcurrentDownloads)
.StartWith(new TickersResponseWithPage(1, first_response))
from tv2 in trwp.TickersResponse.Tickers
select tv2)
.ToList();
list = await query.ToTask(this.cts.Token);
您将同步代码、可枚举、Rx 和任务进行了大量奇怪的混合。所有这些都会在调试时造成一团糟。您应该选择一个 monad 并一直留在其中 - 不要混合使用它们。
你能试试你的代码的这个纯 Rx 版本,让我知道你得到什么样的结果吗?请附加到问题的末尾,不要更改那里的内容。
IObservable<IList<TickerV2>> query =
Observable
.Using(
() => new HttpClient(),
hc =>
from first_response in Observable.FromAsync(ct => api.GetTickersAsync(BatchSize, 1, ct))
let pages = (first_response.Count + BatchSize - 1) / BatchSize
from trwp in
Observable
.Range(2, pages - 1)
.SelectMany(page =>
Observable
.FromAsync(ct => api.GetTickersAsync(BatchSize, page, ct))
.Select(r => new TickersResponseWithPage(page, r)))
.StartWith(new TickersResponseWithPage(1, first_response))
from tv2 in trwp.TickersResponse.Tickers
select tv2)
.ToList();
IList<TickerV2> list = await query;
以下是在 Defer
中创建 api
对象的方法:
IObservable<IList<TickerV2>> query =
Observable
.Defer(() =>
{
var api = new PolygonWebApi(httpClient, this.apiKey);
return
Observable
.Using(... as above ...)
.ToList();
});