在 .NET 中并行使用消息的有效方法
Efficient way to consume messages in parallel in .NET
我有一个消息流,基于某些标准,我希望每个消费者都能够并行处理其中的一些消息。每个消费者都应该能够动态订阅和取消订阅。
关于输入的更多信息:
- 我每秒收到大约 500 条消息
- 我有大约 15000 名消费者
到目前为止我有几个解决方案:
- 活动。
public class Message
{
public Message(int id, string data)
{
Id = id;
Data = data;
}
public int Id { get; }
public string Data { get; }
}
public class ConsumersDispatcher
{
public event EventHandler<Message> MessageReceived;
public ConsumersDispatcher(int id)
{
Id = id;
}
public int Id { get; }
public void OnMessageReceived(Message message)
{
if (MessageReceived == null)
{
return;
}
var delegates = MessageReceived.GetInvocationList();
Parallel.ForEach(delegates, d => d.DynamicInvoke(this, message));
}
}
public class Consumer
{
private readonly ICollection<ConsumersDispatcher> _dispatchers;
public Consumer(int id, string name)
{
Id = id;
Name = name;
_dispatchers = new List<ConsumersDispatcher>();
}
public int Id { get; }
public string Name { get; }
public void Subscribe(ConsumersDispatcher dispatcher)
{
if (_dispatchers.Any(m => m.Id == dispatcher.Id))
{
return;
}
_dispatchers.Add(dispatcher);
dispatcher.MessageReceived += Foo;
}
private void Foo(object sender, Message message)
{
// process message
Console.WriteLine($"{DateTime.Now} | Consumer: {Name} {Id} | Message: {message.Id} {message.Data} |#thread {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(1 * 1000);
}
}
// Usage
var consumersDispatcher = new ConsumersDispatcher(1);
Consumer consumer1 = new Consumer(1, "A");
consumer1.Subscribe(consumersDispatcher);
Consumer consumer2 = new Consumer(2, "B");
consumer2.Subscribe(consumersDispatcher);
Consumer consumer3 = new Consumer(3, "C");
var consumersDispatcher1 = new ConsumersDispatcher(2);
for (int i = 0; i < 20; i++)
{
if (i % 2 == 0)
{
var message = new Message(1, $"data {i}");
consumersDispatcher.OnMessageReceived(message);
continue;
}
var message1 = new Message(2, $"data {i}");
consumersDispatcher1.OnMessageReceived(message1);
}
- “消息调度程序”
public class MessageDispatcher
{
private List<Consumer> _consumers;
public MessageDispatcher(List<Consumer> consumers)
{
_consumers = consumers;
}
public void Dispatch(Message message)
{
IEnumerable<Consumer> consumers = _consumers.Where(a => a.Messages.Any(x => x.Id == message.Id));
Parallel.ForEach(consumers, c => c.Foo(message));
}
}
- Actor 模型(Akka.NET 或 Microsoft Orleans)
结论
- 如果我处理事件,我将耦合我的对象(我不喜欢),据我所知 DynamicInvoke() 使用延迟绑定,这可能很慢或可能不会(我必须做一些性能测试)。
- 第二个解决方案看起来比第一个慢得多。
- Actor 模型看起来正是我需要的。每个消费者都有自己的本地队列,并且是并行执行的。问题是我没有使用任何actor模型,据我了解有很多配置(它应该为Kubernetes配置)和努力。
有人可以建议我一个更优雅的解决方案吗?
此致
这看起来是 TPL Dataflow 库的一个很好的用例。它提供了一个基于参与者的编程模型,但比 Akka.NET 或 Microsoft Orleans 更轻量。您可以通过为每个消费者提供委托并将它们 link 一起提供过滤委托来创建几个内置数据流块。每个块都有自己的队列,您可以对其进行配置。一切都在内存中工作。
Rx.NET 是另一种选择。
我有一个消息流,基于某些标准,我希望每个消费者都能够并行处理其中的一些消息。每个消费者都应该能够动态订阅和取消订阅。
关于输入的更多信息:
- 我每秒收到大约 500 条消息
- 我有大约 15000 名消费者
到目前为止我有几个解决方案:
- 活动。
public class Message
{
public Message(int id, string data)
{
Id = id;
Data = data;
}
public int Id { get; }
public string Data { get; }
}
public class ConsumersDispatcher
{
public event EventHandler<Message> MessageReceived;
public ConsumersDispatcher(int id)
{
Id = id;
}
public int Id { get; }
public void OnMessageReceived(Message message)
{
if (MessageReceived == null)
{
return;
}
var delegates = MessageReceived.GetInvocationList();
Parallel.ForEach(delegates, d => d.DynamicInvoke(this, message));
}
}
public class Consumer
{
private readonly ICollection<ConsumersDispatcher> _dispatchers;
public Consumer(int id, string name)
{
Id = id;
Name = name;
_dispatchers = new List<ConsumersDispatcher>();
}
public int Id { get; }
public string Name { get; }
public void Subscribe(ConsumersDispatcher dispatcher)
{
if (_dispatchers.Any(m => m.Id == dispatcher.Id))
{
return;
}
_dispatchers.Add(dispatcher);
dispatcher.MessageReceived += Foo;
}
private void Foo(object sender, Message message)
{
// process message
Console.WriteLine($"{DateTime.Now} | Consumer: {Name} {Id} | Message: {message.Id} {message.Data} |#thread {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(1 * 1000);
}
}
// Usage
var consumersDispatcher = new ConsumersDispatcher(1);
Consumer consumer1 = new Consumer(1, "A");
consumer1.Subscribe(consumersDispatcher);
Consumer consumer2 = new Consumer(2, "B");
consumer2.Subscribe(consumersDispatcher);
Consumer consumer3 = new Consumer(3, "C");
var consumersDispatcher1 = new ConsumersDispatcher(2);
for (int i = 0; i < 20; i++)
{
if (i % 2 == 0)
{
var message = new Message(1, $"data {i}");
consumersDispatcher.OnMessageReceived(message);
continue;
}
var message1 = new Message(2, $"data {i}");
consumersDispatcher1.OnMessageReceived(message1);
}
- “消息调度程序”
public class MessageDispatcher
{
private List<Consumer> _consumers;
public MessageDispatcher(List<Consumer> consumers)
{
_consumers = consumers;
}
public void Dispatch(Message message)
{
IEnumerable<Consumer> consumers = _consumers.Where(a => a.Messages.Any(x => x.Id == message.Id));
Parallel.ForEach(consumers, c => c.Foo(message));
}
}
- Actor 模型(Akka.NET 或 Microsoft Orleans)
结论
- 如果我处理事件,我将耦合我的对象(我不喜欢),据我所知 DynamicInvoke() 使用延迟绑定,这可能很慢或可能不会(我必须做一些性能测试)。
- 第二个解决方案看起来比第一个慢得多。
- Actor 模型看起来正是我需要的。每个消费者都有自己的本地队列,并且是并行执行的。问题是我没有使用任何actor模型,据我了解有很多配置(它应该为Kubernetes配置)和努力。
有人可以建议我一个更优雅的解决方案吗?
此致
这看起来是 TPL Dataflow 库的一个很好的用例。它提供了一个基于参与者的编程模型,但比 Akka.NET 或 Microsoft Orleans 更轻量。您可以通过为每个消费者提供委托并将它们 link 一起提供过滤委托来创建几个内置数据流块。每个块都有自己的队列,您可以对其进行配置。一切都在内存中工作。
Rx.NET 是另一种选择。