多个消费者不丢失消息

Multiple consumers without losing messages

我有大量的东西通过 redis pub/sub 并且我需要将它分发到多个 websocket 连接,所以基本上无论何时来自 redis 的消息,它都需要通过所有 websockets 分发连接。

我想要多个消费者。他们每个人都应该收到所有消息。

var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(1)
{
    FullMode = BoundedChannelFullMode.Wait,
    SingleReader = false
});
var cts = new CancellationTokenSource();


var producer = Task.Run(async () =>
{
    int i = 0;
    while (!cts.IsCancellationRequested)
    {
        channel.Writer.TryWrite(i++);

        await Task.Delay(TimeSpan.FromMilliseconds(250));
    }
});

var readerOneTask = Task.Run(async () =>
{
    await foreach (var i in channel.Reader.ReadAllAsync())
    {
        Console.WriteLine($"Reader one: {i}");
    }
});

var readerTwoTask = Task.Run(async () =>
{
    await foreach (var i in channel.Reader.ReadAllAsync())
    {
        Console.WriteLine($"Reader two: {i}");
    }
});

cts.CancelAfter(TimeSpan.FromSeconds(5));

Console.ReadLine();

单个 Channel<T> 无法向多个消费者广播消息。每次从通道中读取消息时,消息都会被消费,其他消费者将无法获取它。如果您想向所有消费者广播所有消息,则必须为每个消费者创建一个专用 Channel<T>

您可能会发现这个问题很有趣:Factory for IAsyncEnumerable or IAsyncEnumerator。它显示了为 IAsyncEnumerable<T> 序列实现 source/controller 的各种方法,其中包括通道和 Rx 主题。


更新: 下面演示了如何使用多个通道,以便将所有消息传播给所有消费者。

List<Channel<int>> channels = new();

async Task CreateConsumer(Func<Channel<int>, Task> body)
{
    var channel = Channel.CreateUnbounded<int>();
    lock (channels) channels.Add(channel);
    try
    {
        await Task.Run(() => body(channel)).ConfigureAwait(false);
    }
    finally
    {
        lock (channels) channels.Remove(channel);
    }
}

Task consumer1 = CreateConsumer(async channel =>
{
    await foreach (var i in channel.Reader.ReadAllAsync())
    {
        Console.WriteLine($"Consumer one: {i}");
    }
});

Task consumer2 = CreateConsumer(async channel =>
{
    await foreach (var i in channel.Reader.ReadAllAsync())
    {
        Console.WriteLine($"Consumer two: {i}");
    }
});

using CancellationTokenSource cts = new(TimeSpan.FromMilliseconds(3000));
Task producer = Task.Run(async () =>
{
    int i = 0;
    while (true)
    {
        i++;
        lock (channels) channels.ForEach(channel => channel.Writer.TryWrite(i));
        try { await Task.Delay(TimeSpan.FromMilliseconds(250), cts.Token); }
        catch (OperationCanceledException) { break; }
    }
});

producer.Wait();
lock (channels) channels.ForEach(channel => channel.Writer.Complete());
Task.WaitAll(consumer1, consumer2);

Try it on Fiddle.

CreateConsumer是一个异步方法,负责创建频道并将其添加到列表中。它还负责在消费者完成时从列表中删除频道。这很重要,否则如果消费者失败,生产者将继续在死通道中推送消息,从而导致内存泄漏。

消费者的“主体”(每个消费者可能不同)作为异步 lambda 传递给 CreateConsumer 方法。

在启动生产者之前启动所有消费者并创建他们的通道很重要。这就是 CreateConsumer 方法 而不是 包裹在 Task.Run 中的原因。这样 CreateConsumer 中的代码直到第一个 await 在调用 CreateConsumer.

的同一个线程上同步运行

每次访问带有频道的列表都受到 lock 的保护,因为多个线程可能会同时尝试 read/modify 列表。

你问的是可以的,只是不是这样。

Channel 是一个单一的异步队列,kind-of 就像一个带有异步接口(以及顺序保证、背压和其他东西)的 ConcurrentQueue。就像 ConcurrentQueue 一样,当多个消费者从队列中读取时,每个消费者都会收到一条新消息。要向多个消费者发送相同的消息,您必须广播它。

通用通道模式

通道的一个常见模式是每个处理方法仅使用一个 ChannelReader 作为输入传递,创建并拥有自己的通道,并 return 将其作为输出。这在 Go ( blog post and talk ) 中 非常 常见,其中通道广泛用于 producer/consumer 通信和管道。如果将 <-chan int 替换为 ChannelReader,您会发现几乎所有方法都会收到一个 ChannelReader 并且 return 是一个新方法。

这样处理方法可以控制通道的生命周期。当输入完成或工作被取消时,完成将传播给消费者。由于输出通道是由 worker 自己创建的,因此该方法可以完全控制其生命周期:

ChannelReader<string> Worker(ChannelReader<int> input,
                             CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<string>();
    var writer=channel.Writer;
   
    _ = Task.Run(async ()=>{
        await foreach(var msg in input.ReadAllAsync(token))
        {
            await writer.WriteAsync(msg.ToString(),token);
        }
    },token)
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    return channel.Reader;
}

如果实际代码作为异步方法的 Func<TIn,TOut>Func<TIn,Task<TOut>> 传递,则可以推广此样板代码:

ChannelReader<TOut> Work<TIn,TOut>(ChannelReader<TIn> input,
                             Func<TIn,CancellationToken,Task<TOut>> func,  
                             CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<string>();
    var writer=channel.Writer;
   
    _ = Task.Run(async ()=>{
        await foreach(var msg in input.ReadAllAsync(token))
        {
            var result=await func(msg,token);
            await writer.WriteAsync(result,token);
        }
    },token)
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    return channel.Reader;
}

ChannelReader<TOut> Work<TIn,TOut>(ChannelReader<TIn> input,
                             Func<TIn,CancellationToken,TOut> func,  
                             CancellationToken token=default)
{
...
            var result=func(msg,token);
            await writer.WriteAsync(result,token);
...
}

这可用于创建任何处理块,例如:

ChannelReader<int> step1Out=.....;
ChannelReader<int> step2Out=Work<int,int>(step1Out,async (i,token)=>{
    await Task.Delay(i*1000,token);
    return i;
});
ChannelReader<string> step3Out=Work<int,string>(step2Out,(i,token)=>{
    var line=$"Data is {i}";
    Console.WriteLine(line);
    return line;
});

不产生任何输出的方法可以更简单但异步:

async Task Consume<TIn>(ChannelReader<TIn> input,
                             Action<TIn,CancellationToken> act,  
                             CancellationToken token=default)
{
    await foreach(var msg in input.ReadAllAsync(token))
    {
        act(msg,token);
    }
}

...

await Consume(step2Out,(i,token)=>Console.WriteLine($"Data is {i}"));

广播

可以采用这种简单的模式向 N 个消费者广播相同的消息,方法是创建 N 个通道并return发送给他们的读者:

IList<ChannelReader<T>> Broadcast<T>(ChannelReader<T> input, int count, CancellationToken token=default)
{
    var channels=Enumerable.Range(0,count)
                           .Select(_=> Channel.CreateUnbounded<T>())
                           .ToList();
    var writers=channels.Select(c=>c.Writer).ToList();
   
    _ = Task.Run(async ()=>{
        await foreach(var msg in input.ReadAllAsync(token))
        {            
            //Offer the message to all output channels
            foreach(var w in writers)
            {
                await w.WriteAsync(msg,token);
            }
        }
    },token)
    .ContinueWith(t=>{
            foreach(var w in writers)
            {
                writer.TryComplete(t.Exception);
            }
    });

    return channels.Select(c=>c.Reader).ToList();
}

这样一来,就可以向多个消费者广播同一条消息了:

var broadcast=Broadcast<int,int>(step1Out,2);
var reader1=Consume(broadcast[0],(i,token)=>Console.WriteLine("Reader 0: {0}",i));
var reader2=Consume(broadcast[1],(i,token)=>Console.WriteLine("Reader 1: {0}",i));

甚至

var readers=broadcast.Select((b,idx)=>Consume(b,
                         (i,token)=>Console.WriteLine($"Reader {idx}: {i}"))
                     .ToList();
await Task.WhenAll(readers);