通过回调消息代理将推送公开为 IAsyncEnumerable

Exposing a push via callback message broker as an IAsyncEnumerable

我正在使用第三方库作为发布-订阅消息代理的接口。经纪人是 Solace PubSub+。

对于订阅者,供应商库采用“通过回调推送消息”模式。

我正在围绕供应商库编写自己的包装器库,以便其他开发人员更容易使用(隐藏库如何与网络通信等的所有内部结构)。

同样,我认为将订阅者提要公开为 IAsyncEnumerable 可能会有所帮助,而且我认为这可能是 System.Threading.Channels 的一个很好的用例。我有两个顾虑:

  1. 这里的渠道是否合适,还是我设计过度了?即,是否有更“C# 惯用”的方式来包装回调?
  2. 我的 EnumerableBroker 包装器实现是否安全,或者我是否陷入了某个地方的异步陷阱?

我意识到第一个问题可能比 SO 更适合 CodeReview,但由于该问题的答案也与第二个问题相关,因此将它们放在一起似乎是合适的。值得注意的是:我正在避免使用 IObservable / Rx,因为我的目标是使我的界面 比供应商的界面更基础 ,而不是要求其他开发人员和我自己学习 Rx!理解生产者和消费者过程是如何独立的对于中间的通道也是微不足道的,而对于可观察的我的第一个心理过程是“好的,那么生产者和消费者仍然独立吗?乍一看我必须现在了解调度程序...天哪,我只使用 await foreach 怎么样?"

这是一个没有 EnumerableBroker:

的消费消息的最小模型
// mockup of third party class
private class Broker
{
    // mockup of how the third party library pushes messages via callback
    public void Subscribe(EventHandler<int> handler) => this.handler = handler;

    //simulate the broker pushing messages. Not "real" code
    public void Start()
    {
        Task.Run
        (
            () =>
            {
                for (int i = 0; !cts.Token.IsCancellationRequested; i++)
                {
                    // simulate internal latency
                    Thread.Sleep(10);
                    handler?.Invoke(this, i);
                }
            }, cts.Token
        );
    }

    public void Stop() => cts.Cancel();

    private CancellationTokenSource cts = new();
    private EventHandler<int> handler;
}

private static async Task Main()
{
    var broker = new Broker();
    broker.Subscribe((_, msg) => Console.WriteLine(msg));
    broker.Start();
    await Task.Delay(1000);
    broker.Stop();
}

并且现在对 EnumerableBroker 进行了最少的复制(仍然使用上面列出的相同模拟 Broker class)。这里至少有一个好处似乎是,如果订阅者需要做大量工作来处理一条消息,它不会占用代理的线程——至少在缓冲区填满之前是这样。 似乎 可以正常工作,但我学会了警惕我对异步的有限掌握。

private class EnumerableBroker
{
    public EnumerableBroker(int bufferSize = 8)
    {
        buffer = Channel.CreateBounded<int>
        (
            new BoundedChannelOptions(bufferSize) { SingleReader = true,
                SingleWriter = true }
        );
    }

    public IAsyncEnumerable<int> ReadAsync(CancellationToken ct)
    {
        broker.Subscribe
        (
            // switched to sync per Theodor's comments
            (_, args) => buffer.Writer.WriteAsync(args, ct).AsTask().Wait()
        );
        ct.Register(broker.Stop);
        broker.Start();
        return buffer.Reader.ReadAllAsync(ct);
    }

    private readonly Channel<int> buffer;
    private readonly Broker broker = new();
}

private static async Task Main()
{
    var cts = new CancellationTokenSource();
    var broker = new EnumerableBroker();
    cts.CancelAfter(1000);
    try
    {
        await foreach (var msg in broker.ReadAsync(cts.Token))
        {
            Console.WriteLine(msg);
        }
    }
    catch (OperationCanceledException) { }
}

Am I overengineering this?

没有。 Channel 正是实现此功能所需的组件类型。这是一个非常简单的机制。它基本上是 BlockingCollection<T> class 的异步版本,具有一些额外的功能(如 Completion 属性)和花哨的 API(ReaderWriter 个立面)。

Is my EnumerableBroker wrapper implementation safe, or have I fallen into an async trap somewhere?

是的,有一个陷阱,你已经掉进去了。 SingleWriter = true 配置意味着最多允许同时进行一个 WriteAsync 操作。在发布下一个 WriteAsync 之前,必须先完成上一个。通过使用 async void 委托订阅 broker,您实质上是为代理推送的每条消息创建一个单独的编写器(生产者)。很可能该组件会通过抛出 InvalidOperationExceptions 或其他东西来抱怨这种滥用。不过,解决方案是 而不是 切换到 SingleWriter = false。这只会绕过 Channel 的有限容量,方法是创建一个外部且效率极低的队列,其中的消息不适合 Channel 的内部队列。解决方案是重新考虑您的缓冲策略。如果您不能缓冲无限数量的消息,则必须 drop messages,或者抛出异常并杀死消费者。与 await buffer.Writer.WriteAsync 不同,最好与 bool accepted = buffer.Writer.TryWrite 同步馈送频道,并在 acceptedfalse 时采取适当的措施。

您应该记住的另一个注意事项是 ChannelReader.ReadAllAsync 方法 消耗 。这意味着如果您有多个 readers/consumers 的同一频道,则每条消息将只传递给其中一个消费者。换句话说,每个消费者将收到通道消息的部分子集。您应该将此告知您的同事,因为多次枚举相同的 IAsyncEnumerable<T> 非常简单。毕竟 IAsyncEnumerable<T> 只不过是 IAsyncEnumerator<T> 的工厂。

最后,不用 CancellationToken 控制每个订阅的生命周期,您可以在 IAsyncEnumerator<T> 的枚举终止时自动终止订阅,从而让您的同事的生活更轻松。当 await foreach 循环以任何方式结束时(例如 break 或异常),关联的 IAsyncEnumerator<T> 会自动释放。如果 try/finally 块包装了屈服循环,C# 语言巧妙地将 DisposeAsync 调用与迭代器的 finally 块挂钩。您可以像这样利用这个强大的功能:

public async IAsyncEnumerable<int> ReadAsync(CancellationToken ct)
{
    broker.Subscribe
    (
        //...
    );
    broker.Start();
    try
    {
        await foreach (var msg in buffer.Reader.ReadAllAsync(ct))
        {
            yield return msg;
        }
    }
    finally
    {
        broker.Stop();
    }
}

仅在使用 Channels 不需要那么多代码的意义上,这是过度设计的。一个典型的模式是只使用 methods 接受 ChannelReader 作为输入和 return a ChannelReader 作为输出,方法本身创建和拥有输出通道。这使得将阶段组合到管道中变得非常容易,尤其是当这些方法是扩展方法时。

在这种情况下,您的代码可以重写为:

static ChannelReader<int> ToChannel(this Broker broker, 
    int limit,CancellationToken token=default)
{
    var channel=Channel.CreateBounded<int>(limit);
    var writer=channel.Writer;

    broker.Subscribe((_, args) =>{
        writer.TryWrite(args, token);
    });
    token.Register(()=>writer.Complete());

    return channel;
}

这将丢失超过限制的所有邮件。如果您的 Broker 理解 Task,您可以使用:

broker.Subscribe(async (_, args) =>{
        await writer.WriteAsync(args, token);
    });

如果它不理解任务,而且你不能失去任何东西,也许 更好的 解决方案是使用 unbounded channel 和 handle pause/resume 在后期。您已经问过类似的问题。

否则,您将不得不阻止回调:

broker.Subscribe(async (_, args) =>{
       writer.WriteAsync(args, token).AsTask().Wait();
    });

虽然这不是一个理想的解决方案。

在这两种情况下,您都可以使用 reader:

生成的数据
var token=cts.Token;
var reader=broker.ToChannel(10,token);

await foreach(var item in reader.ReadAllAsync(token))
{
...
}