多个消费者不丢失消息
Multiple consumers without losing messages
我有大量的东西通过 redis pub/sub 并且我需要将它分发到多个 websocket 连接,所以基本上无论何时来自 redis 的消息,它都需要通过所有 websockets 分发连接。
我想要多个消费者。他们每个人都应该收到所有消息。
var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(1)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = false
});
var cts = new CancellationTokenSource();
var producer = Task.Run(async () =>
{
int i = 0;
while (!cts.IsCancellationRequested)
{
channel.Writer.TryWrite(i++);
await Task.Delay(TimeSpan.FromMilliseconds(250));
}
});
var readerOneTask = Task.Run(async () =>
{
await foreach (var i in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Reader one: {i}");
}
});
var readerTwoTask = Task.Run(async () =>
{
await foreach (var i in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Reader two: {i}");
}
});
cts.CancelAfter(TimeSpan.FromSeconds(5));
Console.ReadLine();
单个 Channel<T>
无法向多个消费者广播消息。每次从通道中读取消息时,消息都会被消费,其他消费者将无法获取它。如果您想向所有消费者广播所有消息,则必须为每个消费者创建一个专用 Channel<T>
。
您可能会发现这个问题很有趣:Factory for IAsyncEnumerable or IAsyncEnumerator。它显示了为 IAsyncEnumerable<T>
序列实现 source/controller 的各种方法,其中包括通道和 Rx 主题。
更新: 下面演示了如何使用多个通道,以便将所有消息传播给所有消费者。
List<Channel<int>> channels = new();
async Task CreateConsumer(Func<Channel<int>, Task> body)
{
var channel = Channel.CreateUnbounded<int>();
lock (channels) channels.Add(channel);
try
{
await Task.Run(() => body(channel)).ConfigureAwait(false);
}
finally
{
lock (channels) channels.Remove(channel);
}
}
Task consumer1 = CreateConsumer(async channel =>
{
await foreach (var i in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Consumer one: {i}");
}
});
Task consumer2 = CreateConsumer(async channel =>
{
await foreach (var i in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Consumer two: {i}");
}
});
using CancellationTokenSource cts = new(TimeSpan.FromMilliseconds(3000));
Task producer = Task.Run(async () =>
{
int i = 0;
while (true)
{
i++;
lock (channels) channels.ForEach(channel => channel.Writer.TryWrite(i));
try { await Task.Delay(TimeSpan.FromMilliseconds(250), cts.Token); }
catch (OperationCanceledException) { break; }
}
});
producer.Wait();
lock (channels) channels.ForEach(channel => channel.Writer.Complete());
Task.WaitAll(consumer1, consumer2);
CreateConsumer
是一个异步方法,负责创建频道并将其添加到列表中。它还负责在消费者完成时从列表中删除频道。这很重要,否则如果消费者失败,生产者将继续在死通道中推送消息,从而导致内存泄漏。
消费者的“主体”(每个消费者可能不同)作为异步 lambda 传递给 CreateConsumer
方法。
在启动生产者之前启动所有消费者并创建他们的通道很重要。这就是 CreateConsumer
方法 而不是 包裹在 Task.Run
中的原因。这样 CreateConsumer
中的代码直到第一个 await
在调用 CreateConsumer
.
的同一个线程上同步运行
每次访问带有频道的列表都受到 lock
的保护,因为多个线程可能会同时尝试 read/modify 列表。
你问的是可以的,只是不是这样。
Channel 是一个单一的异步队列,kind-of 就像一个带有异步接口(以及顺序保证、背压和其他东西)的 ConcurrentQueue。就像 ConcurrentQueue 一样,当多个消费者从队列中读取时,每个消费者都会收到一条新消息。要向多个消费者发送相同的消息,您必须广播它。
通用通道模式
通道的一个常见模式是每个处理方法仅使用一个 ChannelReader
作为输入传递,创建并拥有自己的通道,并 return 将其作为输出。这在 Go ( blog post and talk ) 中 非常 常见,其中通道广泛用于 producer/consumer 通信和管道。如果将 <-chan int
替换为 ChannelReader,您会发现几乎所有方法都会收到一个 ChannelReader 并且 return 是一个新方法。
这样处理方法可以控制通道的生命周期。当输入完成或工作被取消时,完成将传播给消费者。由于输出通道是由 worker 自己创建的,因此该方法可以完全控制其生命周期:
ChannelReader<string> Worker(ChannelReader<int> input,
CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<string>();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
await foreach(var msg in input.ReadAllAsync(token))
{
await writer.WriteAsync(msg.ToString(),token);
}
},token)
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel.Reader;
}
如果实际代码作为异步方法的 Func<TIn,TOut>
或 Func<TIn,Task<TOut>>
传递,则可以推广此样板代码:
ChannelReader<TOut> Work<TIn,TOut>(ChannelReader<TIn> input,
Func<TIn,CancellationToken,Task<TOut>> func,
CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<string>();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
await foreach(var msg in input.ReadAllAsync(token))
{
var result=await func(msg,token);
await writer.WriteAsync(result,token);
}
},token)
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel.Reader;
}
ChannelReader<TOut> Work<TIn,TOut>(ChannelReader<TIn> input,
Func<TIn,CancellationToken,TOut> func,
CancellationToken token=default)
{
...
var result=func(msg,token);
await writer.WriteAsync(result,token);
...
}
这可用于创建任何处理块,例如:
ChannelReader<int> step1Out=.....;
ChannelReader<int> step2Out=Work<int,int>(step1Out,async (i,token)=>{
await Task.Delay(i*1000,token);
return i;
});
ChannelReader<string> step3Out=Work<int,string>(step2Out,(i,token)=>{
var line=$"Data is {i}";
Console.WriteLine(line);
return line;
});
不产生任何输出的方法可以更简单但异步:
async Task Consume<TIn>(ChannelReader<TIn> input,
Action<TIn,CancellationToken> act,
CancellationToken token=default)
{
await foreach(var msg in input.ReadAllAsync(token))
{
act(msg,token);
}
}
...
await Consume(step2Out,(i,token)=>Console.WriteLine($"Data is {i}"));
广播
可以采用这种简单的模式向 N 个消费者广播相同的消息,方法是创建 N 个通道并return发送给他们的读者:
IList<ChannelReader<T>> Broadcast<T>(ChannelReader<T> input, int count, CancellationToken token=default)
{
var channels=Enumerable.Range(0,count)
.Select(_=> Channel.CreateUnbounded<T>())
.ToList();
var writers=channels.Select(c=>c.Writer).ToList();
_ = Task.Run(async ()=>{
await foreach(var msg in input.ReadAllAsync(token))
{
//Offer the message to all output channels
foreach(var w in writers)
{
await w.WriteAsync(msg,token);
}
}
},token)
.ContinueWith(t=>{
foreach(var w in writers)
{
writer.TryComplete(t.Exception);
}
});
return channels.Select(c=>c.Reader).ToList();
}
这样一来,就可以向多个消费者广播同一条消息了:
var broadcast=Broadcast<int,int>(step1Out,2);
var reader1=Consume(broadcast[0],(i,token)=>Console.WriteLine("Reader 0: {0}",i));
var reader2=Consume(broadcast[1],(i,token)=>Console.WriteLine("Reader 1: {0}",i));
甚至
var readers=broadcast.Select((b,idx)=>Consume(b,
(i,token)=>Console.WriteLine($"Reader {idx}: {i}"))
.ToList();
await Task.WhenAll(readers);
我有大量的东西通过 redis pub/sub 并且我需要将它分发到多个 websocket 连接,所以基本上无论何时来自 redis 的消息,它都需要通过所有 websockets 分发连接。
我想要多个消费者。他们每个人都应该收到所有消息。
var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(1)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = false
});
var cts = new CancellationTokenSource();
var producer = Task.Run(async () =>
{
int i = 0;
while (!cts.IsCancellationRequested)
{
channel.Writer.TryWrite(i++);
await Task.Delay(TimeSpan.FromMilliseconds(250));
}
});
var readerOneTask = Task.Run(async () =>
{
await foreach (var i in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Reader one: {i}");
}
});
var readerTwoTask = Task.Run(async () =>
{
await foreach (var i in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Reader two: {i}");
}
});
cts.CancelAfter(TimeSpan.FromSeconds(5));
Console.ReadLine();
单个 Channel<T>
无法向多个消费者广播消息。每次从通道中读取消息时,消息都会被消费,其他消费者将无法获取它。如果您想向所有消费者广播所有消息,则必须为每个消费者创建一个专用 Channel<T>
。
您可能会发现这个问题很有趣:Factory for IAsyncEnumerable or IAsyncEnumerator。它显示了为 IAsyncEnumerable<T>
序列实现 source/controller 的各种方法,其中包括通道和 Rx 主题。
更新: 下面演示了如何使用多个通道,以便将所有消息传播给所有消费者。
List<Channel<int>> channels = new();
async Task CreateConsumer(Func<Channel<int>, Task> body)
{
var channel = Channel.CreateUnbounded<int>();
lock (channels) channels.Add(channel);
try
{
await Task.Run(() => body(channel)).ConfigureAwait(false);
}
finally
{
lock (channels) channels.Remove(channel);
}
}
Task consumer1 = CreateConsumer(async channel =>
{
await foreach (var i in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Consumer one: {i}");
}
});
Task consumer2 = CreateConsumer(async channel =>
{
await foreach (var i in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Consumer two: {i}");
}
});
using CancellationTokenSource cts = new(TimeSpan.FromMilliseconds(3000));
Task producer = Task.Run(async () =>
{
int i = 0;
while (true)
{
i++;
lock (channels) channels.ForEach(channel => channel.Writer.TryWrite(i));
try { await Task.Delay(TimeSpan.FromMilliseconds(250), cts.Token); }
catch (OperationCanceledException) { break; }
}
});
producer.Wait();
lock (channels) channels.ForEach(channel => channel.Writer.Complete());
Task.WaitAll(consumer1, consumer2);
CreateConsumer
是一个异步方法,负责创建频道并将其添加到列表中。它还负责在消费者完成时从列表中删除频道。这很重要,否则如果消费者失败,生产者将继续在死通道中推送消息,从而导致内存泄漏。
消费者的“主体”(每个消费者可能不同)作为异步 lambda 传递给 CreateConsumer
方法。
在启动生产者之前启动所有消费者并创建他们的通道很重要。这就是 CreateConsumer
方法 而不是 包裹在 Task.Run
中的原因。这样 CreateConsumer
中的代码直到第一个 await
在调用 CreateConsumer
.
每次访问带有频道的列表都受到 lock
的保护,因为多个线程可能会同时尝试 read/modify 列表。
你问的是可以的,只是不是这样。
Channel 是一个单一的异步队列,kind-of 就像一个带有异步接口(以及顺序保证、背压和其他东西)的 ConcurrentQueue。就像 ConcurrentQueue 一样,当多个消费者从队列中读取时,每个消费者都会收到一条新消息。要向多个消费者发送相同的消息,您必须广播它。
通用通道模式
通道的一个常见模式是每个处理方法仅使用一个 ChannelReader
作为输入传递,创建并拥有自己的通道,并 return 将其作为输出。这在 Go ( blog post and talk ) 中 非常 常见,其中通道广泛用于 producer/consumer 通信和管道。如果将 <-chan int
替换为 ChannelReader,您会发现几乎所有方法都会收到一个 ChannelReader 并且 return 是一个新方法。
这样处理方法可以控制通道的生命周期。当输入完成或工作被取消时,完成将传播给消费者。由于输出通道是由 worker 自己创建的,因此该方法可以完全控制其生命周期:
ChannelReader<string> Worker(ChannelReader<int> input,
CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<string>();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
await foreach(var msg in input.ReadAllAsync(token))
{
await writer.WriteAsync(msg.ToString(),token);
}
},token)
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel.Reader;
}
如果实际代码作为异步方法的 Func<TIn,TOut>
或 Func<TIn,Task<TOut>>
传递,则可以推广此样板代码:
ChannelReader<TOut> Work<TIn,TOut>(ChannelReader<TIn> input,
Func<TIn,CancellationToken,Task<TOut>> func,
CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<string>();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
await foreach(var msg in input.ReadAllAsync(token))
{
var result=await func(msg,token);
await writer.WriteAsync(result,token);
}
},token)
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel.Reader;
}
ChannelReader<TOut> Work<TIn,TOut>(ChannelReader<TIn> input,
Func<TIn,CancellationToken,TOut> func,
CancellationToken token=default)
{
...
var result=func(msg,token);
await writer.WriteAsync(result,token);
...
}
这可用于创建任何处理块,例如:
ChannelReader<int> step1Out=.....;
ChannelReader<int> step2Out=Work<int,int>(step1Out,async (i,token)=>{
await Task.Delay(i*1000,token);
return i;
});
ChannelReader<string> step3Out=Work<int,string>(step2Out,(i,token)=>{
var line=$"Data is {i}";
Console.WriteLine(line);
return line;
});
不产生任何输出的方法可以更简单但异步:
async Task Consume<TIn>(ChannelReader<TIn> input,
Action<TIn,CancellationToken> act,
CancellationToken token=default)
{
await foreach(var msg in input.ReadAllAsync(token))
{
act(msg,token);
}
}
...
await Consume(step2Out,(i,token)=>Console.WriteLine($"Data is {i}"));
广播
可以采用这种简单的模式向 N 个消费者广播相同的消息,方法是创建 N 个通道并return发送给他们的读者:
IList<ChannelReader<T>> Broadcast<T>(ChannelReader<T> input, int count, CancellationToken token=default)
{
var channels=Enumerable.Range(0,count)
.Select(_=> Channel.CreateUnbounded<T>())
.ToList();
var writers=channels.Select(c=>c.Writer).ToList();
_ = Task.Run(async ()=>{
await foreach(var msg in input.ReadAllAsync(token))
{
//Offer the message to all output channels
foreach(var w in writers)
{
await w.WriteAsync(msg,token);
}
}
},token)
.ContinueWith(t=>{
foreach(var w in writers)
{
writer.TryComplete(t.Exception);
}
});
return channels.Select(c=>c.Reader).ToList();
}
这样一来,就可以向多个消费者广播同一条消息了:
var broadcast=Broadcast<int,int>(step1Out,2);
var reader1=Consume(broadcast[0],(i,token)=>Console.WriteLine("Reader 0: {0}",i));
var reader2=Consume(broadcast[1],(i,token)=>Console.WriteLine("Reader 1: {0}",i));
甚至
var readers=broadcast.Select((b,idx)=>Consume(b,
(i,token)=>Console.WriteLine($"Reader {idx}: {i}"))
.ToList();
await Task.WhenAll(readers);