Rx.Net - 以受限并发异步和并行处理组

Rx.Net - process groups asynchronously and in parallel with a constrained concurrency

正在玩 System.Reactive 尝试解决下一个任务 -

这是迄今为止我想到的最好的 -

TaskFactory taskFactory = new (new LimitedConcurrencyLevelTaskScheduler(2));
TaskPoolScheduler scheduler = new (taskFactory);
source
    .GroupBy(item => item)
    .SelectMany(g => g.Select(item => Observable.FromAsync(() => onNextAsync(item))).ObserveOn(scheduler).Concat())
    .Subscribe();

知道如何实现它 w/o 调度程序吗?无法通过 Merge()

使其工作

执行“最多只能同时处理 N 个组” 限制的最简单方法可能是使用 SemaphoreSlim。所以不是这个:

.SelectMany(g => g.Select(item => Observable.FromAsync(() => onNextAsync(item))).Concat())

...你可以这样做:

var semaphore = new SemaphoreSlim(N, N);

//...

.SelectMany(g => g.Select(item => Observable.FromAsync(async () =>
{
    await semaphore.WaitAsync();
    try { return await onNextAsync(item); }
    finally { semaphore.Release(); }
})).Merge(1))

顺便说一句,在 current Rx version (5.0.0) I don't trust 中使用 Concat 运算符,我更喜欢使用 Merge(1)

要使用专门的 Rx 工具解决这个问题,理想情况下你会喜欢这样的东西:

source
    .GroupBy(item => item.Key)
    .Select(group => group.Select(
        item => Observable.FromAsync(() => ProcessAsync(item))).Merge(1))
    .Merge(maxConcurrent: N)
    .Wait();

内部Merge(1)会在每个组内执行排他处理,外部Merge(N)会执行全局最大并发策略。不幸的是,这不起作用,因为外部 Merge(N) 将订阅限制为内部序列(IGroupedObservable<T>),而不是它们的各个元素。这不是你想要的。结果将是只处理前 N 个组,而忽略所有其他组的元素。 GroupBy 运算符创建热子序列,如果您不立即订阅它们,您将丢失元素。

为了使外部 Merge(N) 正常工作,您必须自由合并 Observable.FromAsync 生成的所有内部序列,并使用其他机制序列化每个组的处理。一个想法是实现一个特殊的 Select 运算符,它仅在前一个运算符完成后才发出 Observable.FromAsync。下面是一个基于 Zip 运算符的实现。 Zip 运算符在内部维护两个隐藏的缓冲区,因此它可以从两个序列中生成对,这两个序列可能会发出具有不同频率的元素。这种缓冲正是我们为了避免丢失元素所需要的。

private static IObservable<IObservable<TResult>> SelectOneByOne<TSource, TResult>(
    this IObservable<TSource> source,
    Func<TSource, IObservable<TResult>> selector)
{
    var subject = new BehaviorSubject<Unit>(default);
    var synchronizedSubject = Observer.Synchronize(subject);
    return source
        .Zip(subject, (item, _) => item)
        .Select(item => selector(item).Do(
            _ => { },
            _ => synchronizedSubject.OnNext(default),
            () => synchronizedSubject.OnNext(default)));
}

BehaviorSubject<T>最初包含一个元素,因此第一对将立即产生。在处理完第一个元素之前,不会生成第二对。第三对和第二个元素等同

然后您可以使用此运算符来解决问题,如下所示:

source
    .GroupBy(item => item.Key)
    .SelectMany(group => group.SelectOneByOne(
        item => Observable.FromAsync(() => ProcessAsync(item))))
    .Merge(maxConcurrent: N)
    .Wait();

以上解决方案仅用于回答问题。我不认为我会在生产环境中信任它。