TPL 数据流中的动态 subscribe/unsubscribe

Dynamically subscribe/unsubscribe in TPL Dataflow

我有一个消息流,基于某些标准,我希望每个消费者都能够并行处理其中的一些消息。每个消费者都应该能够动态订阅和取消订阅。

我有以下输入数据限制:

到目前为止,这是我所拥有的:

public class Test
{
    static void Main()
    {
        var consumer1 = new Consumer("Consumer1");
        consumer1.SubscribeForCategory(1);
        consumer1.SubscribeForCategory(2);

        var consumer2 = new Consumer("Consumer2");
        consumer2.SubscribeForCategory(2);
        consumer2.SubscribeForCategory(3);
        consumer2.SubscribeForCategory(4);

        var consumer3 = new Consumer("Consumer3");
        consumer3.SubscribeForCategory(3);
        consumer3.SubscribeForCategory(4);

        var consumers = new[] {consumer1, consumer2, consumer3};
        var publisher = new Publisher(consumers);

        var message1 = new Message(1, "message1 test");
        var message2 = new Message(2, "message2");
        var message3 = new Message(1, "message3");
        var message4 = new Message(3, "message4 test");
        var message5 = new Message(4, "message5");
        var message6 = new Message(3, "message6");

        var messages = new[] {message1, message2, message3, message4, message5, message6};

        foreach (var message in messages)
        {
            publisher.Publish(message);
        }

        Console.ReadLine();
    }
}

public class Message
{
    public Message(int categoryId, string data)
    {
        CategoryId = categoryId;
        Data = data;
    }

    public int CategoryId { get; }

    public string Data { get; }
}

public class Publisher
{
    private readonly IEnumerable<Consumer> _consumers;

    public Publisher(IEnumerable<Consumer> consumers)
    {
        _consumers = consumers;
    }

    public void Publish(Message message)
    {
        IEnumerable<Consumer> consumers = _consumers.Where(c => c.CategoryIds.Contains(message.CategoryId));
        foreach (Consumer consumer in consumers)
        {
            consumer.AddMessage(message);
        }
    }
}

public class Consumer
{
    private readonly HashSet<int> _categoryIds;
    private readonly ActionBlock<Message> _queue;

    public Consumer(string name)
    {
        Name = name;
        _categoryIds = new HashSet<int>();

        _queue = new ActionBlock<Message>(async m => { await Foo(m); }, 
                                          new ExecutionDataflowBlockOptions 
                                          {
                                              MaxDegreeOfParallelism = 1, 
                                              SingleProducerConstrained = true
                                          });
    }

    public string Name { get; }

    public IReadOnlyCollection<int> CategoryIds => _categoryIds;

    public void AddMessage(Message message)
    {
        bool accepted = _queue.Post(message);
        if (!accepted)
        {
            Console.WriteLine("Message has been rejected!");
        }
    }

    public void SubscribeForCategory(int categoryId)
    {
        _categoryIds.Add(categoryId);
    }

    private async Task Foo(Message message)
    {
        // process message
        await Task.Delay(10);

        if (message.Data.Contains("test"))
        {
            _categoryIds.Remove(message.CategoryId);
        }

        Console.WriteLine($"[{Name}] - category id: [{message.CategoryId}] data: [{message.Data}]");
    }
}

不幸的是,该解决方案存在几个问题:

  1. 当消费者处理每条消息时,有可能取消订阅某些已添加到 ActionBlock 输入队列的消息。
  2. 在 Publisher.cs 中,我遍历每个帐户类别集合,稍后在 Account Foo 方法中,有机会删除一些类别,这将导致以下异常:System.InvalidOperationException:集合被修改;枚举操作可能无法执行。
  3. 我也不太确定在 publisher.Publish()
  4. 中加入“调度逻辑”是否是个好主意

一个可能的解决方案是将所有消息转发给每个消费者(并且每个消费者应该决定是否应该处理它)但我担心这会慢得多。

我知道 Akka.Net 和 Microsoft Orleans 等基于 actor 模型的框架,但我希望所有这些都在进程中发生(当然如果可以实现的话)。

谁有更优雅的解决方案?您对我如何改进当前方法有什么建议吗?

我认为您的模型中缺少实体 Category,添加它不仅会在概念上而且会在性能方面改进您的模型。每个类别都可以包含订阅该类别的消费者列表,这样就可以轻松地只向订阅的消费者发送消息。

为了解决线程安全问题,我的建议是使用已经拥有 1,000,000 个项目的 immutable collections instead of mutable HashSet<T>s or List<T>s. The immutable collections offer the advantage that they can be updated safely and atomically with low-lock techniques (ImmutableInterlocked.Update method), and can provide at any time a snapshot of their contents that is unaffected by future modifications. If you are asking how it is possible to mutate an immutable collection, the answer is that you are not mutating it, instead you are replacing the reference with a different immutable collection. These structures are implemented in a way that allows high reusability of their internal bits and pieces. For example adding an item in a ImmutableHashSet<T>,不需要分配包含所有旧项目和新项目的新内存块.只会分配少量小对象(内部二叉树中的节点)。

这种便利是有代价的:大多数对不可变集合的操作比对可变集合的相同操作至少慢 10 倍。很可能这种开销在事物的宏伟计划中可以忽略不计,但您可能想要自己分析和衡量它,并判断它是否有影响。

Category class:

public class Category
{
    private ImmutableHashSet<Consumer> _consumers;

    public int Id { get; }
    public ImmutableHashSet<Consumer> Consumers => Volatile.Read(ref _consumers);

    public Category(int id)
    {
        this.Id = id;
        _consumers = ImmutableHashSet.Create<Consumer>();
    }

    public void SubscribeConsumer(Consumer consumer) =>
        ImmutableInterlocked.Update(ref _consumers, col => col.Add(consumer));

    public void UnsubscribeConsumer(Consumer consumer) =>
        ImmutableInterlocked.Update(ref _consumers, col => col.Remove(consumer));
}

注意 Volatile.Read,它确保存储在 _consumers 字段中的最新引用将立即对访问 Consumers 属性 的所有线程可见。

Consumer class:

public class Consumer
{
    private readonly ActionBlock<Message> _block;
    private IImmutableList<Category> _categories;

    public string Name { get; }
    public IImmutableList<Category> Categories => Volatile.Read(ref _categories);

    public Consumer(string name)
    {
        this.Name = name;
        _categories = ImmutableArray.Create<Category>();
        _block = new ActionBlock<Message>(async message =>
        {
            if (!Categories.Any(cat => cat.Id == message.CategoryId)) return;
            // Process message...
        });
    }

    public void SendMessage(Message message)
    {
        bool accepted = _block.Post(message);
        Debug.Assert(accepted);
    }

    public void SubscribeForCategory(Category category)
    {
        ImmutableInterlocked.Update(ref _categories, col => col.Add(category));
        category.SubscribeConsumer(this);
    }

    public void UnsubscribeForCategory(Category category)
    {
        ImmutableInterlocked.Update(ref _categories, col => col.Remove(category));
        category.UnsubscribeConsumer(this);
    }
}

请注意,SubscribeForCategory 方法还负责添加反向关系(类别 -> 消费者)。在上面的实现中,这两个关系并不是以原子方式相互添加的,这意味着观察者可以看到消费者订阅了一个类别,而该类别没有订阅消费者。从你的描述来看,你的应用中似乎不存在这样的观察者,所以这种不一致可能无关紧要。

Publisher class 需要保存类别列表,而不是消费者:

public class Publisher
{
    private readonly Dictionary<int, Category> _categories;

    public Publisher(IEnumerable<Category> categories)
    {
        _categories = categories.ToDictionary(cat => cat.Id);
    }

    public void Publish(Message message)
    {
        var category = _categories[message.CategoryId];
        foreach (Consumer consumer in category.Consumers)
            consumer.SendMessage(message);
    }
}

注意 Publish 方法是多么简单。

TPL DataFlow 库已经提供了您想要的内容。它的块不是队列,它们是实际的生产者和消费者。您可以删除几乎 所有 添加的代码。您甚至可以使用 LINQ 查询来创建和 link“发布者”和“消费者”:

var n=10;
var consumers=( from i in Enumerable.Range(0,n)
                let categories=new ConcurrentDictinoary<int,int>()
                select new { 
                             Block=new ActionBlock(msg=>Consume(msg,categories)
                                                        ,blockOptions),
                             Categories=categories
                }).ToArray();

foreach(var pair in consumers)
{
    publisher.LinkTo(pair.Block,linkOption,msg=>IsAllowed(msg,pair.Category));
}

bool IsAllowed(Message msg,ConcurrentDictionary<int,int> categories)
{
    return categories.ContainsKey(msg.CategoryId);
}

async Task Consume(Message message,ConcurrentDictinary<int,int> categories)
{
    if (message.Data.Contains("test"))
    {
        categories.TryRemove(message.CategoryId);
    }
    ...
}

块与函数一起工作绝非偶然。 Dataflow 库和它所基于的 CSP 范式与 OOP 有很大不同,更接近于函数式编程。

顺便说一下,TPL 数据流源自 Microsoft Robotics Frameworks 和 Concurrency Runtime。在机器人技术和自动化领域,有 lot 微处理器交换消息。数据流 它专门用于创建复杂的处理网格和处理大量消息。

说明

Dataflow 不是一组队列,它包含要在管道中 linked 的活动块。 ActionBlock 不是队列,它 队列。实际上它是一个消费者,通常位于管道的尾部。 TransformBlock 接收传入的消息,一条一条地处理它们,然后将它们发送到任何 linked 块。

块是 linked,所以你不需要手动从一个块中获取消息并将它们传递到另一个块。 Link 可以包含一个谓词,用于过滤目标块接受的消息。可以通过调用 Dispose 来剪切 link。

假设这是“消费者”方法:

async Task Consume(Message message)
{
    await Task.Delay(100);
    Console.WriteLine($"Category id: [{message.CategoryId}] data: [{message.Data}]");
}

您可以创建几个 ActionBlock,也许在一个数组中:

var consumers=new[]{
     new ActionBlock(Consume),
     new ActionBlock(Consume),
     new ActionBlock(Consume)
};

每个动作块当然可以使用不同的委托。

管道的“头部”可能应该是一个 TransformBlock。在这种情况下,发布者除了得到 linked 到目标块之外什么都不做。至少我们可以打印一些东西:

Message PassThrough(Message message)
{
    Console.WriteLine("Incoming");
    return Message;
}

var publisher=new TransformBlock(PassThrough);

您可以 link 通过 LinkTo 将“发布者”传递给“消费者” :

var options=new DataflowLinkOptions { PropagateCompletion=true};

var link1=publisher.LinkTo(consumers[0],options, msg=>msg.CategoryId % 3==0);
var link2=publisher.LinkTo(consumers[1],options, msg=>msg.CategoryId % 3==1);
var link3=publisher.LinkTo(consumers[2],options, msg=>msg.CategoryId % 3==2);

“发布者”块生成的消息将被发送到第一个 link 谓词接受它的目标。消息按创建顺序提供给 links。如果没有link接受该消息,它将留在输出队列中并阻止它。

在实际场景中,应始终确保所有消息都得到处理,或者有一个块可以处理任何不匹配的内容。

public.LinkTo(theOtherBlock,options);

link1link2link3 对象只是 IDisposeable。它们可以用来打破 link :

link2.Dispose();

Link 可以随时创建和破坏,根据需要更改管道的形状(或更复杂设计中的网格)。如果 link 被破坏或修改,任何已经发布到目标块队列的消息都不会被丢弃。

为了减少不需要的消息的数量,我们可以为每个块的输入队列添加一个绑定:

var blockOptions=new DataflowBlockOptions { BoundedCapacity=1 };

var consumers=new[]{
     new ActionBlock(Consume,blockOptions),
     new ActionBlock(Consume,blockOptions),
     new ActionBlock(Consume,blockOptions)
};

要动态更改接受的消息,我们可以将值存储在例如 ConcurrentDictionary 中。谓词可能会在消费者修改允许值的同时尝试检查消息:

ConcurrentDictionary[] _allowedCategories=new[] {
    new ConcurrentDictionary<int,int>(),
    new ConcurrentDictionary<int,int>(),
    new ConcurrentDictionary<int,int>(),
};

async Task Consume(Message message,ConcurrentDictinary<int,int> categories)
{
    if (message.Data.Contains("test"))
    {
        categories.TryRemove(message.CategoryId);
    }
    ...
}

“消费者”变为

var consumers=new[]{
     new ActionBlock(msg=>Consume(msg,categories[0])),
     new ActionBlock(msg=>Consume(msg,categories[1])),
     new ActionBlock(msg=>Consume(msg,categories[2]))
};

最好为 link 谓词创建一个单独的方法:

bool IsAllowed(Message msg,ConcurrentDictionary<int,int> categories)
{
    return categories.ContainsKey(msg.CategoryId);
}

var link1=publisher.LinkTo(consumers[0],options, msg=>IsAllowed(msg,categories[0]));
var link2=publisher.LinkTo(consumers[1],options, msg=>IsAllowed(msg,categories[1]));

可以使用 LINQ 和 `Enumerable.Range 创建所有这些。这是否是个好主意是另一回事:

var n=10;
var consumers=( from i in Enumerable.Range(0,n)
                         let categories=new ConcurrentDictinoary<int,int>()
                         select new { 
                             Block=new ActionBlock(msg=>Consume(msg,categories)
                                                        ,blockOptions),
                             Categories=categories
                         }).ToArray();

foreach(var pair in consumers)
{
    publisher.LinkTo(pair.Block,linkOption,msg=>IsAllowed(msg,pair.Category));
}

无论网格如何构建,发布到它都是一样的。在头部块

上使用SendAsync
for(int i=0;i<1000;i++)
{
    var msg=new Message(...);
    await publisher.SendAsync(msg);
}