Rx 扩展 Parallel.ForEach 节流
Rx extensions Parallel.ForEach throttling
我正在关注这个问题的答案:Rx extensions: Where is Parallel.ForEach? 以便 运行 使用 Rx 并行执行多个操作。
我 运行 遇到的问题是它似乎正在为 每个 请求分配一个新线程,而使用 Parallel.ForEach
更少。
我 运行 并行处理的进程非常占用内存,所以如果我试图一次处理数百个项目,那么提供给链接问题的答案很快就会出现 运行内存不足。
有没有一种方法可以修改该答案以限制在任何给定时间完成的项目数量?
我查看了 Window
和 Buffer
操作,我的代码如下所示:
return inputs.Select(i => new AccountViewModel(i))
.ToObservable()
.ObserveOn(RxApp.MainThreadScheduler)
.ToList()
.Do(l =>
{
using (Accounts.SuppressChangeNotifications())
{
Accounts.AddRange(l);
}
})
.SelectMany(x => x)
.SelectMany(acc => Observable.StartAsync(async () =>
{
var res = await acc.ProcessAsync(config, m, outputPath);
processed++;
var prog = ((double) processed/inputs.Count())*100.0;
OverallProgress.Message.OnNext(string.Format("Processing Accounts ({0:000}%)", prog));
OverallProgress.Progress.OnNext(prog);
return res;
}))
.All(x => x);
理想情况下,我希望能够将其分批处理成大块的帐户视图模型,然后调用 ProcessAsync
方法,只有在所有批处理完成后才能继续。
理想情况下,即使只有一个批次完成,它也会继续前进,但只会保持相同的批次大小。
因此,如果我有一批 5 件和 1 件成品,我想开始另一件,但只有一件直到更多 space 可用。
像往常一样,Paul Betts 已经回答了一个解决我问题的类似问题:
问题:Reactive Extensions Parallel processing based on specific number
有一些关于使用 Observable.Defer
然后合并成批次的信息,使用它我修改了我以前的代码,如下所示:
return inputs.Select(i => new AccountViewModel(i))
.ToObservable()
.ObserveOn(RxApp.MainThreadScheduler)
.ToList()
.Do(l =>
{
using (Accounts.SuppressChangeNotifications())
{
Accounts.AddRange(l);
}
})
.SelectMany(x => x)
.Select(x => Observable.DeferAsync(async _ =>
{
var res = await x.ProcessAsync(config, m, outputPath);
processed++;
var prog = ((double) processed/inputs.Count())*100.0;
OverallProgress.Message.OnNext(string.Format("Processing Accounts ({0:000}%)", prog));
OverallProgress.Progress.OnNext(prog);
return Observable.Return(res);
}))
.Merge(5)
.All(x => x);
果然,我得到了滚动完成行为(例如,如果 1/5 完成,那么只有一个开始)。
显然我还有一些基础知识要掌握,但这太棒了!
我正在关注这个问题的答案:Rx extensions: Where is Parallel.ForEach? 以便 运行 使用 Rx 并行执行多个操作。
我 运行 遇到的问题是它似乎正在为 每个 请求分配一个新线程,而使用 Parallel.ForEach
更少。
我 运行 并行处理的进程非常占用内存,所以如果我试图一次处理数百个项目,那么提供给链接问题的答案很快就会出现 运行内存不足。
有没有一种方法可以修改该答案以限制在任何给定时间完成的项目数量?
我查看了 Window
和 Buffer
操作,我的代码如下所示:
return inputs.Select(i => new AccountViewModel(i))
.ToObservable()
.ObserveOn(RxApp.MainThreadScheduler)
.ToList()
.Do(l =>
{
using (Accounts.SuppressChangeNotifications())
{
Accounts.AddRange(l);
}
})
.SelectMany(x => x)
.SelectMany(acc => Observable.StartAsync(async () =>
{
var res = await acc.ProcessAsync(config, m, outputPath);
processed++;
var prog = ((double) processed/inputs.Count())*100.0;
OverallProgress.Message.OnNext(string.Format("Processing Accounts ({0:000}%)", prog));
OverallProgress.Progress.OnNext(prog);
return res;
}))
.All(x => x);
理想情况下,我希望能够将其分批处理成大块的帐户视图模型,然后调用 ProcessAsync
方法,只有在所有批处理完成后才能继续。
理想情况下,即使只有一个批次完成,它也会继续前进,但只会保持相同的批次大小。
因此,如果我有一批 5 件和 1 件成品,我想开始另一件,但只有一件直到更多 space 可用。
像往常一样,Paul Betts 已经回答了一个解决我问题的类似问题:
问题:Reactive Extensions Parallel processing based on specific number
有一些关于使用 Observable.Defer
然后合并成批次的信息,使用它我修改了我以前的代码,如下所示:
return inputs.Select(i => new AccountViewModel(i))
.ToObservable()
.ObserveOn(RxApp.MainThreadScheduler)
.ToList()
.Do(l =>
{
using (Accounts.SuppressChangeNotifications())
{
Accounts.AddRange(l);
}
})
.SelectMany(x => x)
.Select(x => Observable.DeferAsync(async _ =>
{
var res = await x.ProcessAsync(config, m, outputPath);
processed++;
var prog = ((double) processed/inputs.Count())*100.0;
OverallProgress.Message.OnNext(string.Format("Processing Accounts ({0:000}%)", prog));
OverallProgress.Progress.OnNext(prog);
return Observable.Return(res);
}))
.Merge(5)
.All(x => x);
果然,我得到了滚动完成行为(例如,如果 1/5 完成,那么只有一个开始)。
显然我还有一些基础知识要掌握,但这太棒了!