如何使用 ReactiveExtensions Range 和 WithAsync 每 N 页批量写入数据库

How to batch write to database every N pages with ReactiveExtensions Range and WithAsync

我有以下运行良好的功能...但是考虑到它生成的数据量,我需要每获取 10 页左右的页面就批量写入数据库。

var start = DateTime.Now;

IList<AggV2> list = null;

var lastRan = DateTime.UtcNow.AddMonths(-6); // get dummy date 6 months ago
var daysToRun = (DateTime.UtcNow - lastRan).Days;

try
{
    IObservable<IList<AggV2>> query =
        Observable
            .Using(
                () => new HttpClient(),
                hc =>
                    from day in
                        Observable
                            .Range(1, daysToRun)
                            .Select(day =>
                                Observable
                                    .FromAsync(ct => PolygonWebApi.GetGroupedDailyBarsAsync(hc, this.apiKey, Locale.US, Market.Stocks, lastRan.AddDays(day), false, ct))
                                    .Select(r =>
                                    {
                                        this.logger.LogInformation("got {0} records for {1}", r.Results.Count(), lastRan.AddDays(day));
                                        return new TickersResponseWithDay(lastRan.AddDays(day), r);
                                    }))
                            .Merge(MaxConcurrentDownloads)
                    from tv2 in day.AggregateResponse.Results
                    select tv2)
            .ToList();

    list = await query.ToTask(cancellationToken);
}
catch (OperationCanceledException) { }
catch (Exception e) { this.logger.LogError(e, e.Message); }

var duration = DateTime.Now - start;

if (cancellationToken.IsCancellationRequested)
    this.logger.LogInformation("{0} cancelled after {1}, database not updated", this.GetType().Name, duration.Humanize());
else
{
    this.logger.LogInformation("{0} downloaded {1} tickers in {2}, saving to database...", this.GetType().Name, list.Count, duration.Humanize());
    await SaveTickersToDatabaseAsync(list, cts.Token);
}

与其获取所有数据然后写入,不如每10页调用一次SaveTickersToDatabaseAsync(list, cancellationToken)

我还需要能够在任何时候退出应用程序 cancellationToken 已设置。

请问是否可以合并上述批处理和取消要求?

使用 .Buffer(10) 非常简单。但是,通过将 SaveTickersToDatabaseAsync 推入查询(这是正确的做法),您正在使整体错误处理和日志记录在方法结束时变得越来越无关紧要。我建议建议尝试将其删除并尝试将其全部放入查询中。

代码如下所示:

IObservable<IList<Unit>> query =
    Observable
        .Using(
            () => new HttpClient(),
            hc =>
                from day in
                    Observable
                        .Range(1, daysToRun)
                        .Select(day =>
                            Observable
                                .FromAsync(ct => PolygonWebApi.GetGroupedDailyBarsAsync(hc, this.apiKey, Locale.US, Market.Stocks, lastRan.AddDays(day), false, ct))
                                .Do(r => this.logger.LogInformation("got {0} records for {1}", r.Results.Count(), lastRan.AddDays(day)))
                                .Select(r => new TickersResponseWithDay(lastRan.AddDays(day), r)))
                        .Merge(MaxConcurrentDownloads)
                from tv2 in day.AggregateResponse.Results
                select tv2)
        .Buffer(10)
        .SelectMany(xs => Observable.FromAsync(ct => SaveTickersToDatabaseAsync(xs, ct)))
        .ToList();

IList<Unit> list = await query.ToTask(cancellationToken);

现在,您还应该考虑使用正常的 Subscribe 而不是 query.ToTask(cancellationToken) 到 运行 的查询。

您的代码如下所示:

IObservable<Unit> query =
    Observable
        .Defer(() =>
        {
            var lastRan = DateTime.UtcNow.AddMonths(-6); // get dummy date 6 months ago
            var daysToRun = (DateTime.UtcNow - lastRan).Days;
            
            return
                Observable
                    .Using(
                        () => new HttpClient(),
                        hc =>
                            from day in
                                Observable
                                    .Range(1, daysToRun)
                                    .Select(day =>
                                        Observable
                                            .FromAsync(ct => PolygonWebApi.GetGroupedDailyBarsAsync(hc, this.apiKey, Locale.US, Market.Stocks, lastRan.AddDays(day), false, ct))
                                            .Do(r => this.logger.LogInformation("got {0} records for {1}", r.Results.Count(), lastRan.AddDays(day)))
                                            .Select(r => new TickersResponseWithDay(lastRan.AddDays(day), r)))
                                    .Merge(MaxConcurrentDownloads)
                            from tv2 in day.AggregateResponse.Results
                            select tv2)
                    .Buffer(10)
                    .SelectMany(xs => Observable.FromAsync(ct => SaveTickersToDatabaseAsync(xs, ct)));
        });

IDisposable subscription =
    query
        .Subscribe(
            x => { /* each call to `SaveTickersToDatabaseAsync` runs this code */  },
            ex => { /* an exception? then end here */ },
            () => { /* successfully completed */ });

这很干净且独立,这是 Rx 的做事方式。