在 .NET 中并行使用消息的有效方法

Efficient way to consume messages in parallel in .NET

我有一个消息流,基于某些标准,我希望每个消费者都能够并行处理其中的一些消息。每个消费者都应该能够动态订阅和取消订阅。

关于输入的更多信息:

到目前为止我有几个解决方案:

  1. 活动。
public class Message
{
    public Message(int id, string data)
    {
        Id = id;
        Data = data;
    }

    public int Id { get; }

    public string Data { get; }
}

public class ConsumersDispatcher
{
    public event EventHandler<Message> MessageReceived;

    public ConsumersDispatcher(int id)
    {
        Id = id;
    }

    public int Id { get; }

    public void OnMessageReceived(Message message)
    {
        if (MessageReceived == null)
        {
            return;
        }

        var delegates = MessageReceived.GetInvocationList();

        Parallel.ForEach(delegates, d => d.DynamicInvoke(this, message));
    }
}

public class Consumer
{
    private readonly ICollection<ConsumersDispatcher> _dispatchers;

    public Consumer(int id, string name)
    {
        Id = id;
        Name = name;
        _dispatchers = new List<ConsumersDispatcher>();
    }

    public int Id { get; }

    public string Name { get; }

    public void Subscribe(ConsumersDispatcher dispatcher)
    {
        if (_dispatchers.Any(m => m.Id == dispatcher.Id))
        {
            return;
        }

        _dispatchers.Add(dispatcher);
        dispatcher.MessageReceived += Foo;
    }

    private void Foo(object sender, Message message)
    {
        // process message
        Console.WriteLine($"{DateTime.Now} | Consumer: {Name} {Id} | Message: {message.Id} {message.Data} |#thread {Thread.CurrentThread.ManagedThreadId}");

        Thread.Sleep(1 * 1000);
    }
}

// Usage
 var consumersDispatcher = new ConsumersDispatcher(1);
 Consumer consumer1 = new Consumer(1, "A");
 consumer1.Subscribe(consumersDispatcher);
 Consumer consumer2 = new Consumer(2, "B");
 consumer2.Subscribe(consumersDispatcher);
 Consumer consumer3 = new Consumer(3, "C");

 var consumersDispatcher1 = new ConsumersDispatcher(2);

            for (int i = 0; i < 20; i++)
            {
                if (i % 2 == 0)
                {
                    var message = new Message(1, $"data {i}");
                    consumersDispatcher.OnMessageReceived(message);
                    continue;
                }

                var message1 = new Message(2, $"data {i}");
                consumersDispatcher1.OnMessageReceived(message1);
            }

  1. “消息调度程序”
public class MessageDispatcher

{
    private List<Consumer> _consumers;

    public MessageDispatcher(List<Consumer> consumers)
    {
        _consumers = consumers;
    }

    public void Dispatch(Message message)
    {
        IEnumerable<Consumer> consumers = _consumers.Where(a => a.Messages.Any(x => x.Id == message.Id));

        Parallel.ForEach(consumers, c => c.Foo(message));
    }
}

  1. Actor 模型(Akka.NET 或 Microsoft Orleans)

结论

有人可以建议我一个更优雅的解决方案吗?

此致

这看起来是 TPL Dataflow 库的一个很好的用例。它提供了一个基于参与者的编程模型,但比 Akka.NET 或 Microsoft Orleans 更轻量。您可以通过为每个消费者提供委托并将它们 link 一起提供过滤委托来创建几个内置数据流块。每个块都有自己的队列,您可以对其进行配置。一切都在内存中工作。

Rx.NET 是另一种选择。