如何在 Rx.NET 中以正确的方式约束并发
How to constraint concurrency the right way in Rx.NET
请注意以下代码片段:
var result = await GetSource(1000).SelectMany(s => getResultAsync(s).ToObservable()).ToList();
此代码的问题在于 getResultAsync
运行 以不受约束的方式并发。在某些情况下,这可能不是我们想要的。假设我想将其并发限制为最多 10 个并发调用。 Rx.NET 的方法是什么?
我附上了一个简单的控制台应用程序,它演示了主题和我对所描述问题的蹩脚解决方案。
有一些额外的代码,比如 Stats
class 和人工随机睡眠。它们的存在是为了确保我真正获得并发执行,并且可以可靠地计算在该过程中达到的最大并发数。
方法RunUnconstrained
体现天真无拘无束运行。方法 RunConstrained
显示了我的解决方案,这不是很优雅。理想情况下,我想通过简单地将专用 Rx 运算符应用于 Monad 来简化并发约束。当然,在不牺牲性能的情况下。
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
namespace RxConstrainedConcurrency
{
class Program
{
public class Stats
{
public int MaxConcurrentCount;
public int CurConcurrentCount;
public readonly object MaxConcurrentCountGuard = new object();
}
static void Main()
{
RunUnconstrained().GetAwaiter().GetResult();
RunConstrained().GetAwaiter().GetResult();
}
static async Task RunUnconstrained()
{
await Run(AsyncOp);
}
static async Task RunConstrained()
{
using (var sem = new SemaphoreSlim(10))
{
await Run(async (s, pause, stats) =>
{
// ReSharper disable AccessToDisposedClosure
await sem.WaitAsync();
try
{
return await AsyncOp(s, pause, stats);
}
finally
{
sem.Release();
}
// ReSharper restore AccessToDisposedClosure
});
}
}
static async Task Run(Func<string, int, Stats, Task<int>> getResultAsync)
{
var stats = new Stats();
var rnd = new Random(0x1234);
var result = await GetSource(1000).SelectMany(s => getResultAsync(s, rnd.Next(30), stats).ToObservable()).ToList();
Debug.Assert(stats.CurConcurrentCount == 0);
Debug.Assert(result.Count == 1000);
Debug.Assert(!result.Contains(0));
Debug.WriteLine("Max concurrency = " + stats.MaxConcurrentCount);
}
static IObservable<string> GetSource(int count)
{
return Enumerable.Range(1, count).Select(i => i.ToString()).ToObservable();
}
static Task<int> AsyncOp(string s, int pause, Stats stats)
{
return Task.Run(() =>
{
int cur = Interlocked.Increment(ref stats.CurConcurrentCount);
if (stats.MaxConcurrentCount < cur)
{
lock (stats.MaxConcurrentCountGuard)
{
if (stats.MaxConcurrentCount < cur)
{
stats.MaxConcurrentCount = cur;
}
}
}
try
{
Thread.Sleep(pause);
return int.Parse(s);
}
finally
{
Interlocked.Decrement(ref stats.CurConcurrentCount);
}
});
}
}
}
您可以在 Rx 中使用 Merge
的重载来执行此操作,该重载限制了对内部可观察对象的并发订阅数。
这种形式的Merge
应用于流的流。
通常,使用 SelectMany
从事件调用异步任务有两个作用:将每个事件投射到一个可观察流中,其单个事件是结果,并将所有结果流压平在一起。
要使用 Merge
我们必须使用常规 Select
将每个事件投射到异步任务的调用中,(从而创建一个流的流),并使用 Merge
压平结果。它将通过在任何时间点仅订阅提供的固定数量的内部流来以受限的方式执行此操作。
我们必须小心,仅在订阅包装内部流时调用每个异步任务调用。使用 ToObservable()
将异步任务转换为可观察对象实际上会立即调用异步任务,而不是在订阅时调用,因此我们必须 延迟 评估直到使用 [=19= 订阅].
这是一个将所有这些步骤放在一起的示例:
void Main()
{
var xs = Observable.Range(0, 10); // source events
// "Double" here is our async operation to be constrained,
// in this case to 3 concurrent invocations
xs.Select(x =>
Observable.Defer(() => Double(x).ToObservable())).Merge(3)
.Subscribe(Console.WriteLine,
() => Console.WriteLine("Max: " + MaxConcurrent));
}
private static int Concurrent;
private static int MaxConcurrent;
private static readonly object gate = new Object();
public async Task<int> Double(int x)
{
var concurrent = Interlocked.Increment(ref Concurrent);
lock(gate)
{
MaxConcurrent = Math.Max(concurrent, MaxConcurrent);
}
await Task.Delay(TimeSpan.FromSeconds(1));
Interlocked.Decrement(ref Concurrent);
return x * 2;
}
此处的最大并发输出将为“3”。删除 Merge to go "unconstrained" 你会得到“10”。
另一种(等效的)获得 Defer
效果的方法是使用 FromAsync
而不是 Defer
+ ToObservable
:
xs.Select(x => Observable.FromAsync(() => Double(x))).Merge(3)
请注意以下代码片段:
var result = await GetSource(1000).SelectMany(s => getResultAsync(s).ToObservable()).ToList();
此代码的问题在于 getResultAsync
运行 以不受约束的方式并发。在某些情况下,这可能不是我们想要的。假设我想将其并发限制为最多 10 个并发调用。 Rx.NET 的方法是什么?
我附上了一个简单的控制台应用程序,它演示了主题和我对所描述问题的蹩脚解决方案。
有一些额外的代码,比如 Stats
class 和人工随机睡眠。它们的存在是为了确保我真正获得并发执行,并且可以可靠地计算在该过程中达到的最大并发数。
方法RunUnconstrained
体现天真无拘无束运行。方法 RunConstrained
显示了我的解决方案,这不是很优雅。理想情况下,我想通过简单地将专用 Rx 运算符应用于 Monad 来简化并发约束。当然,在不牺牲性能的情况下。
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
namespace RxConstrainedConcurrency
{
class Program
{
public class Stats
{
public int MaxConcurrentCount;
public int CurConcurrentCount;
public readonly object MaxConcurrentCountGuard = new object();
}
static void Main()
{
RunUnconstrained().GetAwaiter().GetResult();
RunConstrained().GetAwaiter().GetResult();
}
static async Task RunUnconstrained()
{
await Run(AsyncOp);
}
static async Task RunConstrained()
{
using (var sem = new SemaphoreSlim(10))
{
await Run(async (s, pause, stats) =>
{
// ReSharper disable AccessToDisposedClosure
await sem.WaitAsync();
try
{
return await AsyncOp(s, pause, stats);
}
finally
{
sem.Release();
}
// ReSharper restore AccessToDisposedClosure
});
}
}
static async Task Run(Func<string, int, Stats, Task<int>> getResultAsync)
{
var stats = new Stats();
var rnd = new Random(0x1234);
var result = await GetSource(1000).SelectMany(s => getResultAsync(s, rnd.Next(30), stats).ToObservable()).ToList();
Debug.Assert(stats.CurConcurrentCount == 0);
Debug.Assert(result.Count == 1000);
Debug.Assert(!result.Contains(0));
Debug.WriteLine("Max concurrency = " + stats.MaxConcurrentCount);
}
static IObservable<string> GetSource(int count)
{
return Enumerable.Range(1, count).Select(i => i.ToString()).ToObservable();
}
static Task<int> AsyncOp(string s, int pause, Stats stats)
{
return Task.Run(() =>
{
int cur = Interlocked.Increment(ref stats.CurConcurrentCount);
if (stats.MaxConcurrentCount < cur)
{
lock (stats.MaxConcurrentCountGuard)
{
if (stats.MaxConcurrentCount < cur)
{
stats.MaxConcurrentCount = cur;
}
}
}
try
{
Thread.Sleep(pause);
return int.Parse(s);
}
finally
{
Interlocked.Decrement(ref stats.CurConcurrentCount);
}
});
}
}
}
您可以在 Rx 中使用 Merge
的重载来执行此操作,该重载限制了对内部可观察对象的并发订阅数。
这种形式的Merge
应用于流的流。
通常,使用 SelectMany
从事件调用异步任务有两个作用:将每个事件投射到一个可观察流中,其单个事件是结果,并将所有结果流压平在一起。
要使用 Merge
我们必须使用常规 Select
将每个事件投射到异步任务的调用中,(从而创建一个流的流),并使用 Merge
压平结果。它将通过在任何时间点仅订阅提供的固定数量的内部流来以受限的方式执行此操作。
我们必须小心,仅在订阅包装内部流时调用每个异步任务调用。使用 ToObservable()
将异步任务转换为可观察对象实际上会立即调用异步任务,而不是在订阅时调用,因此我们必须 延迟 评估直到使用 [=19= 订阅].
这是一个将所有这些步骤放在一起的示例:
void Main()
{
var xs = Observable.Range(0, 10); // source events
// "Double" here is our async operation to be constrained,
// in this case to 3 concurrent invocations
xs.Select(x =>
Observable.Defer(() => Double(x).ToObservable())).Merge(3)
.Subscribe(Console.WriteLine,
() => Console.WriteLine("Max: " + MaxConcurrent));
}
private static int Concurrent;
private static int MaxConcurrent;
private static readonly object gate = new Object();
public async Task<int> Double(int x)
{
var concurrent = Interlocked.Increment(ref Concurrent);
lock(gate)
{
MaxConcurrent = Math.Max(concurrent, MaxConcurrent);
}
await Task.Delay(TimeSpan.FromSeconds(1));
Interlocked.Decrement(ref Concurrent);
return x * 2;
}
此处的最大并发输出将为“3”。删除 Merge to go "unconstrained" 你会得到“10”。
另一种(等效的)获得 Defer
效果的方法是使用 FromAsync
而不是 Defer
+ ToObservable
:
xs.Select(x => Observable.FromAsync(() => Double(x))).Merge(3)