Rx.Net - 以受限并发异步和并行处理组
Rx.Net - process groups asynchronously and in parallel with a constrained concurrency
正在玩 System.Reactive 尝试解决下一个任务 -
- 将传入的字符串流分成组
- 每个组中的项目必须异步和顺序处理
- 组必须并行处理
- 最多只能同时处理N组
- 理想情况下,w/o 使用同步原语
这是迄今为止我想到的最好的 -
TaskFactory taskFactory = new (new LimitedConcurrencyLevelTaskScheduler(2));
TaskPoolScheduler scheduler = new (taskFactory);
source
.GroupBy(item => item)
.SelectMany(g => g.Select(item => Observable.FromAsync(() => onNextAsync(item))).ObserveOn(scheduler).Concat())
.Subscribe();
知道如何实现它 w/o 调度程序吗?无法通过 Merge()
使其工作
执行“最多只能同时处理 N 个组” 限制的最简单方法可能是使用 SemaphoreSlim
。所以不是这个:
.SelectMany(g => g.Select(item => Observable.FromAsync(() => onNextAsync(item))).Concat())
...你可以这样做:
var semaphore = new SemaphoreSlim(N, N);
//...
.SelectMany(g => g.Select(item => Observable.FromAsync(async () =>
{
await semaphore.WaitAsync();
try { return await onNextAsync(item); }
finally { semaphore.Release(); }
})).Merge(1))
顺便说一句,在 current Rx version (5.0.0) I don't trust 中使用 Concat
运算符,我更喜欢使用 Merge(1)
。
要使用专门的 Rx 工具解决这个问题,理想情况下你会喜欢这样的东西:
source
.GroupBy(item => item.Key)
.Select(group => group.Select(
item => Observable.FromAsync(() => ProcessAsync(item))).Merge(1))
.Merge(maxConcurrent: N)
.Wait();
内部Merge(1)
会在每个组内执行排他处理,外部Merge(N)
会执行全局最大并发策略。不幸的是,这不起作用,因为外部 Merge(N)
将订阅限制为内部序列(IGroupedObservable<T>
),而不是它们的各个元素。这不是你想要的。结果将是只处理前 N 个组,而忽略所有其他组的元素。 GroupBy
运算符创建热子序列,如果您不立即订阅它们,您将丢失元素。
为了使外部 Merge(N)
正常工作,您必须自由合并 Observable.FromAsync
生成的所有内部序列,并使用其他机制序列化每个组的处理。一个想法是实现一个特殊的 Select
运算符,它仅在前一个运算符完成后才发出 Observable.FromAsync
。下面是一个基于 Zip
运算符的实现。 Zip
运算符在内部维护两个隐藏的缓冲区,因此它可以从两个序列中生成对,这两个序列可能会发出具有不同频率的元素。这种缓冲正是我们为了避免丢失元素所需要的。
private static IObservable<IObservable<TResult>> SelectOneByOne<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, IObservable<TResult>> selector)
{
var subject = new BehaviorSubject<Unit>(default);
var synchronizedSubject = Observer.Synchronize(subject);
return source
.Zip(subject, (item, _) => item)
.Select(item => selector(item).Do(
_ => { },
_ => synchronizedSubject.OnNext(default),
() => synchronizedSubject.OnNext(default)));
}
BehaviorSubject<T>
最初包含一个元素,因此第一对将立即产生。在处理完第一个元素之前,不会生成第二对。第三对和第二个元素等同
然后您可以使用此运算符来解决问题,如下所示:
source
.GroupBy(item => item.Key)
.SelectMany(group => group.SelectOneByOne(
item => Observable.FromAsync(() => ProcessAsync(item))))
.Merge(maxConcurrent: N)
.Wait();
以上解决方案仅用于回答问题。我不认为我会在生产环境中信任它。
正在玩 System.Reactive 尝试解决下一个任务 -
- 将传入的字符串流分成组
- 每个组中的项目必须异步和顺序处理
- 组必须并行处理
- 最多只能同时处理N组
- 理想情况下,w/o 使用同步原语
这是迄今为止我想到的最好的 -
TaskFactory taskFactory = new (new LimitedConcurrencyLevelTaskScheduler(2));
TaskPoolScheduler scheduler = new (taskFactory);
source
.GroupBy(item => item)
.SelectMany(g => g.Select(item => Observable.FromAsync(() => onNextAsync(item))).ObserveOn(scheduler).Concat())
.Subscribe();
知道如何实现它 w/o 调度程序吗?无法通过 Merge()
使其工作执行“最多只能同时处理 N 个组” 限制的最简单方法可能是使用 SemaphoreSlim
。所以不是这个:
.SelectMany(g => g.Select(item => Observable.FromAsync(() => onNextAsync(item))).Concat())
...你可以这样做:
var semaphore = new SemaphoreSlim(N, N);
//...
.SelectMany(g => g.Select(item => Observable.FromAsync(async () =>
{
await semaphore.WaitAsync();
try { return await onNextAsync(item); }
finally { semaphore.Release(); }
})).Merge(1))
顺便说一句,在 current Rx version (5.0.0) I don't trust 中使用 Concat
运算符,我更喜欢使用 Merge(1)
。
要使用专门的 Rx 工具解决这个问题,理想情况下你会喜欢这样的东西:
source
.GroupBy(item => item.Key)
.Select(group => group.Select(
item => Observable.FromAsync(() => ProcessAsync(item))).Merge(1))
.Merge(maxConcurrent: N)
.Wait();
内部Merge(1)
会在每个组内执行排他处理,外部Merge(N)
会执行全局最大并发策略。不幸的是,这不起作用,因为外部 Merge(N)
将订阅限制为内部序列(IGroupedObservable<T>
),而不是它们的各个元素。这不是你想要的。结果将是只处理前 N 个组,而忽略所有其他组的元素。 GroupBy
运算符创建热子序列,如果您不立即订阅它们,您将丢失元素。
为了使外部 Merge(N)
正常工作,您必须自由合并 Observable.FromAsync
生成的所有内部序列,并使用其他机制序列化每个组的处理。一个想法是实现一个特殊的 Select
运算符,它仅在前一个运算符完成后才发出 Observable.FromAsync
。下面是一个基于 Zip
运算符的实现。 Zip
运算符在内部维护两个隐藏的缓冲区,因此它可以从两个序列中生成对,这两个序列可能会发出具有不同频率的元素。这种缓冲正是我们为了避免丢失元素所需要的。
private static IObservable<IObservable<TResult>> SelectOneByOne<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, IObservable<TResult>> selector)
{
var subject = new BehaviorSubject<Unit>(default);
var synchronizedSubject = Observer.Synchronize(subject);
return source
.Zip(subject, (item, _) => item)
.Select(item => selector(item).Do(
_ => { },
_ => synchronizedSubject.OnNext(default),
() => synchronizedSubject.OnNext(default)));
}
BehaviorSubject<T>
最初包含一个元素,因此第一对将立即产生。在处理完第一个元素之前,不会生成第二对。第三对和第二个元素等同
然后您可以使用此运算符来解决问题,如下所示:
source
.GroupBy(item => item.Key)
.SelectMany(group => group.SelectOneByOne(
item => Observable.FromAsync(() => ProcessAsync(item))))
.Merge(maxConcurrent: N)
.Wait();
以上解决方案仅用于回答问题。我不认为我会在生产环境中信任它。