可以为多个消费者缓存 IAsyncEnumerable 吗?

Is it OK to cache IAsyncEnumerable for multiple consumers?

我正在研究用 IAsyncEnumerable 替换一些常规 C# 事件模式实例的效果。这将通过 IAsyncEnumerable 的惰性 instantiation/activation 并缓存该引用以供所有 callers/listeners 使用来实现。一些快速测试(见下文)表明这是可行的,但我还没有在网上看到其他以这种方式使用 IAsyncEnumerable 的示例。

我意识到这并不是 IAsyncEnumerable 的创建目的,在这种情况下大多数人会提倡使用 ReactiveX (https://github.com/dotnet/reactive)。然而,我很欣赏分析为什么人们愿意或不愿意按照描述的那样做(而不是如何用 Rx 做到这一点)。我在下面提供了几个示例。我的候选事件模式替换是一个更像是事件流的模式(比如从串行连接或 UDP 套接字等产生的反序列化消息)

示例 1:

  class Program
  {
    public static async Task Main( string[] args )
    {
      // Cache an async enumerable (imagine stream of events e.g. deserialized messages arriving)
      var asyncEnumerable = GetNumbersAsync( 10 );

      // Now multiple consumers want to respond to incoming events/messages (in this case just numbers)
      await Task.WhenAll( Task.Run( () => ProcessNumbersAsync( 1, asyncEnumerable ) ), Task.Run( () => ProcessNumbersAsync( 2, asyncEnumerable ) ) );

      Console.WriteLine( "DONE!");
    }
    
    private static async Task ProcessNumbersAsync( int id, IAsyncEnumerable<int> numbers )
    {
      await foreach ( var n in numbers )
        Console.WriteLine( $"{id}: Processing {n}" );
    }

    private static async IAsyncEnumerable<int> GetNumbersAsync( int maxNumber )
    {
      // This would really be async read operations from a remote source
      for ( var i = 0; i < maxNumber; i++ )
      {
        await Task.Delay( 100 );
        yield return i;
      }
    }
  }

作为此模式的用户,这会产生我想要的输出:

1: Processing 0
2: Processing 0
2: Processing 1
1: Processing 1
2: Processing 2
1: Processing 2
2: Processing 3
1: Processing 3
2: Processing 4
1: Processing 4
2: Processing 5
1: Processing 5
1: Processing 6
2: Processing 6
1: Processing 7
2: Processing 7
2: Processing 8
1: Processing 8
2: Processing 9
1: Processing 9
DONE!

前面的示例将每个消费者放在不同的线程上,但基于上下文(可能是 WPF 应用程序),同一线程上可能有多个消费者(使用 IEnumerable 不可能,但使用 IAsyncEnumerable 可以打开门)。以下是在控制台应用程序中,但可以想象生产者和消费者是在 WPF 应用程序的 UI 线程上创建的。

示例 2:

  class Program
  {
    public static async Task Main( string[] args )
    {
      var producer = new Producer();
      var consumer1 = new Consumer( 1, producer );
      var consumer2 = new Consumer( 2, producer );
      var consumer3 = new Consumer( 3, producer );

      await Task.WhenAll( consumer1.ConsumeMessagesAsync(), consumer2.ConsumeMessagesAsync(), Task.Run( async () => await consumer3.ConsumeMessagesAsync() ) );

      Console.WriteLine( "DONE!");
    }

    // Singleton producer
    private interface IProducer
    {
      IAsyncEnumerable<int> GetMessagesAsync();
    }

    // Transient consumer
    private interface IConsumer
    {
      Task ConsumeMessagesAsync();
    }

    private class Producer : IProducer
    {
      private const int _maxFakeMessages = 10;
      private readonly object _mutex = new Object();

      private IAsyncEnumerable<int> _actualIncomingMessagesEnumerable;
      
      public IAsyncEnumerable<int> GetMessagesAsync()
      {
        // TODO: use AsyncEx AsyncLock
        lock ( _mutex )
        {
          if ( _actualIncomingMessagesEnumerable == null)
            _actualIncomingMessagesEnumerable = ReadIncomingMessagesAsync();
        }

        return _actualIncomingMessagesEnumerable;
      }

      private static async IAsyncEnumerable<int> ReadIncomingMessagesAsync()
      {
        for ( var i = 0; i < _maxFakeMessages; i++ )
        {
          await Task.Delay( 100 );
          yield return i;
        }
      }
    }

    private class Consumer
    {
      private readonly int _id;
      private readonly IProducer _producer;
      public Consumer( int id,  IProducer producer )
      {
        _id = id;
        _producer = producer;
      }

      public async Task ConsumeMessagesAsync()
      {
        await foreach( var n in _producer.GetMessagesAsync() )
          Console.WriteLine( $"{_id}: Processing {n}" );
      }
    }
  }

同样,此输出是我作为用户想要的:

1: Processing 0
2: Processing 0
3: Processing 0
2: Processing 1
3: Processing 1
1: Processing 1
2: Processing 2
1: Processing 2
3: Processing 2
1: Processing 3
2: Processing 3
3: Processing 3
1: Processing 4
2: Processing 4
3: Processing 4
3: Processing 5
1: Processing 5
2: Processing 5
1: Processing 6
3: Processing 6
2: Processing 6
2: Processing 7
1: Processing 7
3: Processing 7
3: Processing 8
2: Processing 8
1: Processing 8
1: Processing 9
3: Processing 9
2: Processing 9
DONE!

像这样的模式固有的一个好处是 consumer/caller 可以让它们的 callback/item-of-type-T-handling-code 发生在它们自己的 SynchronizationContext 中。通常,SerialPort 或 Timer 或其他来源的事件可能发生在后台线程上,而用户——尤其是在 UI 线程上——可能需要执行他们自己的同步。在这种情况下,UI 线程上的消费者始终可以在 UI 线程上执行代码,而控制台应用程序中的用户将在线程池上执行代码。

我是不是漏掉了什么?

看来您正在寻找频道。

An Introduction to System.Threading.Channels

Working with Channels in .NET

让我们稍微改变第一个示例的“事件源”的实现,GetNumbersAsync 方法:

private static int _current = 0;
private static async IAsyncEnumerable<int> GetNumbersAsync(int maxNumber)
{
    // This would really be async read operations from a remote source
    for (var i = 0; i < maxNumber; i++)
    {
        await Task.Delay(100);
        yield return Interlocked.Increment(ref _current);
    }
}

更改后的输出如下:

1: Processing 1
2: Processing 2
2: Processing 4
1: Processing 3
2: Processing 5
1: Processing 6
1: Processing 8
2: Processing 7
2: Processing 9
1: Processing 10
1: Processing 12
2: Processing 11
1: Processing 14
2: Processing 13
1: Processing 15
2: Processing 16
1: Processing 17
2: Processing 18
1: Processing 19
2: Processing 20

每个消费者收到不同的“事件”!

虽然您示例中的 IAsyncEnumerable 是单个缓存实例,但每次您尝试使用 await foreach 语句枚举它时,新的 IAsyncEnumerator is created, with its life bounded with this specific enumeration. The IAsyncEnumerators are neither thread-safe nor reusable, and if you try to cache one and share it between consumers, with each consumer calling its MoveNextAsync 方法没有同步,您将得到未定义的行为。

如果您想要一个可以随时安全 subscribed/unsubscribed 的 IAsyncEnumerable 来源,并将所有消息传播给可能以不同速度使用它们的订阅者,那么它远没有像缓存由 C# 迭代器创建的 IAsyncEnumerable(一种包含 yield 语句的方法)。您可以找到 AsyncEnumerableSource here.

的实现