强制 Observable 订阅者一次只写入一个流

Enforce Observable Subscribers to only write to the stream one at a time

我目前正在使用 observables 来管理在总线上生成的消息,这些消息被推送到各种流上。

一切正常,但随着消息的传入,系统可能会尝试同时将多条消息写入流(即来自多个线程的消息),或者消息的发布速度比预期的要快写入流...如您所见,这会在写入时引起问题。

因此,我想弄清楚如何组织事情,以便在收到消息时一次只处理一个。有什么想法吗?

public class MessageStreamResource : IResourceStartup
{
    private readonly IBus _bus;
    private readonly ISubject<string> _sender;

    public MessageStreamResource(IBus bus)
    {
        _bus = bus;

        _senderSubject = new Subject<string>();

        //`All` can publish messages at the same time as it's
        //collecting data being generated from different threads
        _bus.All.Subscribe(message => Observable.Start(() => ProcessMessage(message), TaskPoolScheduler.Default));

        //Note the above hops off the calls context so that the 
        //writing to the stream wont slow down the caller.
    }

    public void Configure(IAppBuilder app)
    {
        app.Map("/stream", async context =>
        {
            ...

            await context.Response.WriteAsync("Lets party!\n");
            await context.Response.Body.FlushAsync();

            var unSubscribe = _sender.Subscribe(async t =>
            {
                //PROBLEM HERE
                //I only want this callback to be executed 
                //one at a time...

                await context.Response.WriteAsync($"{t}\n");
                await context.Response.Body.FlushAsync();
            });

            ...

            await HoldOpenTask;
        });
    }

    private void ProcessMessage(IMessage message)
    {
        _sender.OnNext(message.Payload);
    }
}

如果我正确理解了这个问题,这可能可以用 SemaphoreSlim:

来完成
// ...
var semaphore = new SemaphoreSlim(initialCount: 1);

var unSubscribe = _sender.Subscribe(async t =>
{
    //PROBLEM HERE
    //I only want this callback to be executed 
    //one at a time...

    await semaphore.WaitAsync();
    try
    {
        await context.Response.WriteAsync($"{t}\n");
        await context.Response.Body.FlushAsync();
    }
    finally
    {
        semaphore.Release();
    }
});

SemaphoreSlimIDisposable,确保在适当的时候处理它。

已更新,从第二次看,MapExtensions.Map 接受 Action<IAppBuilder>,因此您传递的是 async void lambda,本质上是创建了一堆即发即弃的异步操作。 Map 呼叫将 return 给呼叫者,而他们可能仍在徘徊。这很可能不是你想要的,是吗?