强制 Observable 订阅者一次只写入一个流
Enforce Observable Subscribers to only write to the stream one at a time
我目前正在使用 observables 来管理在总线上生成的消息,这些消息被推送到各种流上。
一切正常,但随着消息的传入,系统可能会尝试同时将多条消息写入流(即来自多个线程的消息),或者消息的发布速度比预期的要快写入流...如您所见,这会在写入时引起问题。
因此,我想弄清楚如何组织事情,以便在收到消息时一次只处理一个。有什么想法吗?
public class MessageStreamResource : IResourceStartup
{
private readonly IBus _bus;
private readonly ISubject<string> _sender;
public MessageStreamResource(IBus bus)
{
_bus = bus;
_senderSubject = new Subject<string>();
//`All` can publish messages at the same time as it's
//collecting data being generated from different threads
_bus.All.Subscribe(message => Observable.Start(() => ProcessMessage(message), TaskPoolScheduler.Default));
//Note the above hops off the calls context so that the
//writing to the stream wont slow down the caller.
}
public void Configure(IAppBuilder app)
{
app.Map("/stream", async context =>
{
...
await context.Response.WriteAsync("Lets party!\n");
await context.Response.Body.FlushAsync();
var unSubscribe = _sender.Subscribe(async t =>
{
//PROBLEM HERE
//I only want this callback to be executed
//one at a time...
await context.Response.WriteAsync($"{t}\n");
await context.Response.Body.FlushAsync();
});
...
await HoldOpenTask;
});
}
private void ProcessMessage(IMessage message)
{
_sender.OnNext(message.Payload);
}
}
如果我正确理解了这个问题,这可能可以用 SemaphoreSlim
:
来完成
// ...
var semaphore = new SemaphoreSlim(initialCount: 1);
var unSubscribe = _sender.Subscribe(async t =>
{
//PROBLEM HERE
//I only want this callback to be executed
//one at a time...
await semaphore.WaitAsync();
try
{
await context.Response.WriteAsync($"{t}\n");
await context.Response.Body.FlushAsync();
}
finally
{
semaphore.Release();
}
});
SemaphoreSlim
是 IDisposable
,确保在适当的时候处理它。
已更新,从第二次看,MapExtensions.Map
接受 Action<IAppBuilder>
,因此您传递的是 async void
lambda,本质上是创建了一堆即发即弃的异步操作。 Map
呼叫将 return 给呼叫者,而他们可能仍在徘徊。这很可能不是你想要的,是吗?
我目前正在使用 observables 来管理在总线上生成的消息,这些消息被推送到各种流上。
一切正常,但随着消息的传入,系统可能会尝试同时将多条消息写入流(即来自多个线程的消息),或者消息的发布速度比预期的要快写入流...如您所见,这会在写入时引起问题。
因此,我想弄清楚如何组织事情,以便在收到消息时一次只处理一个。有什么想法吗?
public class MessageStreamResource : IResourceStartup
{
private readonly IBus _bus;
private readonly ISubject<string> _sender;
public MessageStreamResource(IBus bus)
{
_bus = bus;
_senderSubject = new Subject<string>();
//`All` can publish messages at the same time as it's
//collecting data being generated from different threads
_bus.All.Subscribe(message => Observable.Start(() => ProcessMessage(message), TaskPoolScheduler.Default));
//Note the above hops off the calls context so that the
//writing to the stream wont slow down the caller.
}
public void Configure(IAppBuilder app)
{
app.Map("/stream", async context =>
{
...
await context.Response.WriteAsync("Lets party!\n");
await context.Response.Body.FlushAsync();
var unSubscribe = _sender.Subscribe(async t =>
{
//PROBLEM HERE
//I only want this callback to be executed
//one at a time...
await context.Response.WriteAsync($"{t}\n");
await context.Response.Body.FlushAsync();
});
...
await HoldOpenTask;
});
}
private void ProcessMessage(IMessage message)
{
_sender.OnNext(message.Payload);
}
}
如果我正确理解了这个问题,这可能可以用 SemaphoreSlim
:
// ...
var semaphore = new SemaphoreSlim(initialCount: 1);
var unSubscribe = _sender.Subscribe(async t =>
{
//PROBLEM HERE
//I only want this callback to be executed
//one at a time...
await semaphore.WaitAsync();
try
{
await context.Response.WriteAsync($"{t}\n");
await context.Response.Body.FlushAsync();
}
finally
{
semaphore.Release();
}
});
SemaphoreSlim
是 IDisposable
,确保在适当的时候处理它。
已更新,从第二次看,MapExtensions.Map
接受 Action<IAppBuilder>
,因此您传递的是 async void
lambda,本质上是创建了一堆即发即弃的异步操作。 Map
呼叫将 return 给呼叫者,而他们可能仍在徘徊。这很可能不是你想要的,是吗?