可以为多个消费者缓存 IAsyncEnumerable 吗?
Is it OK to cache IAsyncEnumerable for multiple consumers?
我正在研究用 IAsyncEnumerable 替换一些常规 C# 事件模式实例的效果。这将通过 IAsyncEnumerable 的惰性 instantiation/activation 并缓存该引用以供所有 callers/listeners 使用来实现。一些快速测试(见下文)表明这是可行的,但我还没有在网上看到其他以这种方式使用 IAsyncEnumerable 的示例。
我意识到这并不是 IAsyncEnumerable 的创建目的,在这种情况下大多数人会提倡使用 ReactiveX (https://github.com/dotnet/reactive)。然而,我很欣赏分析为什么人们愿意或不愿意按照描述的那样做(而不是如何用 Rx 做到这一点)。我在下面提供了几个示例。我的候选事件模式替换是一个更像是事件流的模式(比如从串行连接或 UDP 套接字等产生的反序列化消息)
示例 1:
class Program
{
public static async Task Main( string[] args )
{
// Cache an async enumerable (imagine stream of events e.g. deserialized messages arriving)
var asyncEnumerable = GetNumbersAsync( 10 );
// Now multiple consumers want to respond to incoming events/messages (in this case just numbers)
await Task.WhenAll( Task.Run( () => ProcessNumbersAsync( 1, asyncEnumerable ) ), Task.Run( () => ProcessNumbersAsync( 2, asyncEnumerable ) ) );
Console.WriteLine( "DONE!");
}
private static async Task ProcessNumbersAsync( int id, IAsyncEnumerable<int> numbers )
{
await foreach ( var n in numbers )
Console.WriteLine( $"{id}: Processing {n}" );
}
private static async IAsyncEnumerable<int> GetNumbersAsync( int maxNumber )
{
// This would really be async read operations from a remote source
for ( var i = 0; i < maxNumber; i++ )
{
await Task.Delay( 100 );
yield return i;
}
}
}
作为此模式的用户,这会产生我想要的输出:
1: Processing 0
2: Processing 0
2: Processing 1
1: Processing 1
2: Processing 2
1: Processing 2
2: Processing 3
1: Processing 3
2: Processing 4
1: Processing 4
2: Processing 5
1: Processing 5
1: Processing 6
2: Processing 6
1: Processing 7
2: Processing 7
2: Processing 8
1: Processing 8
2: Processing 9
1: Processing 9
DONE!
前面的示例将每个消费者放在不同的线程上,但基于上下文(可能是 WPF 应用程序),同一线程上可能有多个消费者(使用 IEnumerable 不可能,但使用 IAsyncEnumerable 可以打开门)。以下是在控制台应用程序中,但可以想象生产者和消费者是在 WPF 应用程序的 UI 线程上创建的。
示例 2:
class Program
{
public static async Task Main( string[] args )
{
var producer = new Producer();
var consumer1 = new Consumer( 1, producer );
var consumer2 = new Consumer( 2, producer );
var consumer3 = new Consumer( 3, producer );
await Task.WhenAll( consumer1.ConsumeMessagesAsync(), consumer2.ConsumeMessagesAsync(), Task.Run( async () => await consumer3.ConsumeMessagesAsync() ) );
Console.WriteLine( "DONE!");
}
// Singleton producer
private interface IProducer
{
IAsyncEnumerable<int> GetMessagesAsync();
}
// Transient consumer
private interface IConsumer
{
Task ConsumeMessagesAsync();
}
private class Producer : IProducer
{
private const int _maxFakeMessages = 10;
private readonly object _mutex = new Object();
private IAsyncEnumerable<int> _actualIncomingMessagesEnumerable;
public IAsyncEnumerable<int> GetMessagesAsync()
{
// TODO: use AsyncEx AsyncLock
lock ( _mutex )
{
if ( _actualIncomingMessagesEnumerable == null)
_actualIncomingMessagesEnumerable = ReadIncomingMessagesAsync();
}
return _actualIncomingMessagesEnumerable;
}
private static async IAsyncEnumerable<int> ReadIncomingMessagesAsync()
{
for ( var i = 0; i < _maxFakeMessages; i++ )
{
await Task.Delay( 100 );
yield return i;
}
}
}
private class Consumer
{
private readonly int _id;
private readonly IProducer _producer;
public Consumer( int id, IProducer producer )
{
_id = id;
_producer = producer;
}
public async Task ConsumeMessagesAsync()
{
await foreach( var n in _producer.GetMessagesAsync() )
Console.WriteLine( $"{_id}: Processing {n}" );
}
}
}
同样,此输出是我作为用户想要的:
1: Processing 0
2: Processing 0
3: Processing 0
2: Processing 1
3: Processing 1
1: Processing 1
2: Processing 2
1: Processing 2
3: Processing 2
1: Processing 3
2: Processing 3
3: Processing 3
1: Processing 4
2: Processing 4
3: Processing 4
3: Processing 5
1: Processing 5
2: Processing 5
1: Processing 6
3: Processing 6
2: Processing 6
2: Processing 7
1: Processing 7
3: Processing 7
3: Processing 8
2: Processing 8
1: Processing 8
1: Processing 9
3: Processing 9
2: Processing 9
DONE!
像这样的模式固有的一个好处是 consumer/caller 可以让它们的 callback/item-of-type-T-handling-code 发生在它们自己的 SynchronizationContext 中。通常,SerialPort 或 Timer 或其他来源的事件可能发生在后台线程上,而用户——尤其是在 UI 线程上——可能需要执行他们自己的同步。在这种情况下,UI 线程上的消费者始终可以在 UI 线程上执行代码,而控制台应用程序中的用户将在线程池上执行代码。
我是不是漏掉了什么?
看来您正在寻找频道。
让我们稍微改变第一个示例的“事件源”的实现,GetNumbersAsync
方法:
private static int _current = 0;
private static async IAsyncEnumerable<int> GetNumbersAsync(int maxNumber)
{
// This would really be async read operations from a remote source
for (var i = 0; i < maxNumber; i++)
{
await Task.Delay(100);
yield return Interlocked.Increment(ref _current);
}
}
更改后的输出如下:
1: Processing 1
2: Processing 2
2: Processing 4
1: Processing 3
2: Processing 5
1: Processing 6
1: Processing 8
2: Processing 7
2: Processing 9
1: Processing 10
1: Processing 12
2: Processing 11
1: Processing 14
2: Processing 13
1: Processing 15
2: Processing 16
1: Processing 17
2: Processing 18
1: Processing 19
2: Processing 20
每个消费者收到不同的“事件”!
虽然您示例中的 IAsyncEnumerable
是单个缓存实例,但每次您尝试使用 await foreach
语句枚举它时,新的 IAsyncEnumerator
is created, with its life bounded with this specific enumeration. The IAsyncEnumerator
s are neither thread-safe nor reusable, and if you try to cache one and share it between consumers, with each consumer calling its MoveNextAsync
方法没有同步,您将得到未定义的行为。
如果您想要一个可以随时安全 subscribed/unsubscribed 的 IAsyncEnumerable
来源,并将所有消息传播给可能以不同速度使用它们的订阅者,那么它远没有像缓存由 C# 迭代器创建的 IAsyncEnumerable
(一种包含 yield
语句的方法)。您可以找到 AsyncEnumerableSource
here.
的实现
我正在研究用 IAsyncEnumerable 替换一些常规 C# 事件模式实例的效果。这将通过 IAsyncEnumerable 的惰性 instantiation/activation 并缓存该引用以供所有 callers/listeners 使用来实现。一些快速测试(见下文)表明这是可行的,但我还没有在网上看到其他以这种方式使用 IAsyncEnumerable 的示例。
我意识到这并不是 IAsyncEnumerable 的创建目的,在这种情况下大多数人会提倡使用 ReactiveX (https://github.com/dotnet/reactive)。然而,我很欣赏分析为什么人们愿意或不愿意按照描述的那样做(而不是如何用 Rx 做到这一点)。我在下面提供了几个示例。我的候选事件模式替换是一个更像是事件流的模式(比如从串行连接或 UDP 套接字等产生的反序列化消息)
示例 1:
class Program
{
public static async Task Main( string[] args )
{
// Cache an async enumerable (imagine stream of events e.g. deserialized messages arriving)
var asyncEnumerable = GetNumbersAsync( 10 );
// Now multiple consumers want to respond to incoming events/messages (in this case just numbers)
await Task.WhenAll( Task.Run( () => ProcessNumbersAsync( 1, asyncEnumerable ) ), Task.Run( () => ProcessNumbersAsync( 2, asyncEnumerable ) ) );
Console.WriteLine( "DONE!");
}
private static async Task ProcessNumbersAsync( int id, IAsyncEnumerable<int> numbers )
{
await foreach ( var n in numbers )
Console.WriteLine( $"{id}: Processing {n}" );
}
private static async IAsyncEnumerable<int> GetNumbersAsync( int maxNumber )
{
// This would really be async read operations from a remote source
for ( var i = 0; i < maxNumber; i++ )
{
await Task.Delay( 100 );
yield return i;
}
}
}
作为此模式的用户,这会产生我想要的输出:
1: Processing 0
2: Processing 0
2: Processing 1
1: Processing 1
2: Processing 2
1: Processing 2
2: Processing 3
1: Processing 3
2: Processing 4
1: Processing 4
2: Processing 5
1: Processing 5
1: Processing 6
2: Processing 6
1: Processing 7
2: Processing 7
2: Processing 8
1: Processing 8
2: Processing 9
1: Processing 9
DONE!
前面的示例将每个消费者放在不同的线程上,但基于上下文(可能是 WPF 应用程序),同一线程上可能有多个消费者(使用 IEnumerable 不可能,但使用 IAsyncEnumerable 可以打开门)。以下是在控制台应用程序中,但可以想象生产者和消费者是在 WPF 应用程序的 UI 线程上创建的。
示例 2:
class Program
{
public static async Task Main( string[] args )
{
var producer = new Producer();
var consumer1 = new Consumer( 1, producer );
var consumer2 = new Consumer( 2, producer );
var consumer3 = new Consumer( 3, producer );
await Task.WhenAll( consumer1.ConsumeMessagesAsync(), consumer2.ConsumeMessagesAsync(), Task.Run( async () => await consumer3.ConsumeMessagesAsync() ) );
Console.WriteLine( "DONE!");
}
// Singleton producer
private interface IProducer
{
IAsyncEnumerable<int> GetMessagesAsync();
}
// Transient consumer
private interface IConsumer
{
Task ConsumeMessagesAsync();
}
private class Producer : IProducer
{
private const int _maxFakeMessages = 10;
private readonly object _mutex = new Object();
private IAsyncEnumerable<int> _actualIncomingMessagesEnumerable;
public IAsyncEnumerable<int> GetMessagesAsync()
{
// TODO: use AsyncEx AsyncLock
lock ( _mutex )
{
if ( _actualIncomingMessagesEnumerable == null)
_actualIncomingMessagesEnumerable = ReadIncomingMessagesAsync();
}
return _actualIncomingMessagesEnumerable;
}
private static async IAsyncEnumerable<int> ReadIncomingMessagesAsync()
{
for ( var i = 0; i < _maxFakeMessages; i++ )
{
await Task.Delay( 100 );
yield return i;
}
}
}
private class Consumer
{
private readonly int _id;
private readonly IProducer _producer;
public Consumer( int id, IProducer producer )
{
_id = id;
_producer = producer;
}
public async Task ConsumeMessagesAsync()
{
await foreach( var n in _producer.GetMessagesAsync() )
Console.WriteLine( $"{_id}: Processing {n}" );
}
}
}
同样,此输出是我作为用户想要的:
1: Processing 0
2: Processing 0
3: Processing 0
2: Processing 1
3: Processing 1
1: Processing 1
2: Processing 2
1: Processing 2
3: Processing 2
1: Processing 3
2: Processing 3
3: Processing 3
1: Processing 4
2: Processing 4
3: Processing 4
3: Processing 5
1: Processing 5
2: Processing 5
1: Processing 6
3: Processing 6
2: Processing 6
2: Processing 7
1: Processing 7
3: Processing 7
3: Processing 8
2: Processing 8
1: Processing 8
1: Processing 9
3: Processing 9
2: Processing 9
DONE!
像这样的模式固有的一个好处是 consumer/caller 可以让它们的 callback/item-of-type-T-handling-code 发生在它们自己的 SynchronizationContext 中。通常,SerialPort 或 Timer 或其他来源的事件可能发生在后台线程上,而用户——尤其是在 UI 线程上——可能需要执行他们自己的同步。在这种情况下,UI 线程上的消费者始终可以在 UI 线程上执行代码,而控制台应用程序中的用户将在线程池上执行代码。
我是不是漏掉了什么?
看来您正在寻找频道。
让我们稍微改变第一个示例的“事件源”的实现,GetNumbersAsync
方法:
private static int _current = 0;
private static async IAsyncEnumerable<int> GetNumbersAsync(int maxNumber)
{
// This would really be async read operations from a remote source
for (var i = 0; i < maxNumber; i++)
{
await Task.Delay(100);
yield return Interlocked.Increment(ref _current);
}
}
更改后的输出如下:
1: Processing 1
2: Processing 2
2: Processing 4
1: Processing 3
2: Processing 5
1: Processing 6
1: Processing 8
2: Processing 7
2: Processing 9
1: Processing 10
1: Processing 12
2: Processing 11
1: Processing 14
2: Processing 13
1: Processing 15
2: Processing 16
1: Processing 17
2: Processing 18
1: Processing 19
2: Processing 20
每个消费者收到不同的“事件”!
虽然您示例中的 IAsyncEnumerable
是单个缓存实例,但每次您尝试使用 await foreach
语句枚举它时,新的 IAsyncEnumerator
is created, with its life bounded with this specific enumeration. The IAsyncEnumerator
s are neither thread-safe nor reusable, and if you try to cache one and share it between consumers, with each consumer calling its MoveNextAsync
方法没有同步,您将得到未定义的行为。
如果您想要一个可以随时安全 subscribed/unsubscribed 的 IAsyncEnumerable
来源,并将所有消息传播给可能以不同速度使用它们的订阅者,那么它远没有像缓存由 C# 迭代器创建的 IAsyncEnumerable
(一种包含 yield
语句的方法)。您可以找到 AsyncEnumerableSource
here.