在分布式系统中组织事件的执行并避免死锁

Organizing execution of events in a distributed system and avoid deadlocks

我在确定系统中事件的优先级时遇到问题。 我有一个简单的 class 可以订阅彼此的输出

public interface INode<TIn, TOut> : IBaseNode
{
    event EventHandler<TOut> Output; 
    //Note: subscribe just calls node.Output += this.OnInput
    void Subscribe(IBaseNode node);
    void OnInput(object sender, TIn input)
} 

使用这个我可以通过订阅它们的输出将节点链接在一起[=13​​=]

CarDealerNode.Subscribe(NewModelNode);
LoggerNode.Subscribe(CarDealerNode);

我的问题是,当事件触发时,它以半不确定的广度优先方式发生。我想维护这些事件的执行顺序,以便我可以以更动态的方式确定事件执行的优先级。

我的第一印象是使用一些优先级队列来对任务进行排序,但这可能会导致问题,因为优先级较低的事情可能永远不会执行

public class SynchronizationInfo
{
    public SyncPriority Priority { get; set; } = SyncPriority.Normal;
    public object Sender { get; set; }
    public DateTime Created { get; set; } = DateTime.Now;
    public Task Operation { get; set; }
}

public class SynchronizationContext
{
    public PriorityQueue<SynchronizationInfo> ExecutionQueue = new PriorityQueue<SynchronizationInfo>();
    //...
}

然而,我仍然难以掌握确保不会发生死锁的方法,如果以比执行该优先级更快的速度添加高优先级的东西,则低优先级的事件将不会发生' t执行。

此外,仅仅因为某些东西的优先级过低并不意味着所有优先级较高的东西都应该先处理,时间是一个重要因素。

是否有可靠有效的推荐方法来处理任务的优先执行。以一种没有任务经历死锁的方式,(例如,时间以一种将较低优先级上移以确保执行的方式增加优先级)?

目前,事件按照它们注册的顺序执行。然而,这只是它现在的实现方式,我不建议任何人依赖这种行为,因为在未来的版本中可能不会如此。

.NET 语言将事件实现隐藏在句法障碍后面,尽管事件处理是基于定义相对明确的委托系统。

从这里检查委托 Class: Delegate Class

即使在附加事件之后,您也可以更改顺序,方法是分离所有处理程序,然后按所需顺序重新附加。

注意:您可以通过更改事件的 addremove 操作来指定其他一些行为,从而覆盖事件的默认行为。然后,您可以将事件处理程序保存在您自己管理的列表中,并根据您喜欢的任何规则处理触发顺序。

Is there a solid efficient recommended way of handing priority execution of tasks. In a way that no task experiences dead-lock, (e.g time increases priority in a way in which lower priorities are moved up to assure execution)?

“时间增加优先级”方法的问题是需要一直重新计算优先级队列。

让我们回顾一下优先级队列的正常用例。以下表示数据结构中已排序项目的列表:

  • { 优先级 = SyncPriority.High, 创建时间 = "2021-03-05 12:34:01", ... }
  • { 优先级 = SyncPriority.High, 创建时间 = "2021-03-05 12:34:04", ... }
  • { 优先级 = SyncPriority.High, 创建时间 = "2021-03-05 12:34:06", ... }
  • { 优先级 = SyncPriority.Normal, 创建时间 = "2021-03-05 12:34:02", ... }
  • { 优先级 = SyncPriority.Normal, 创建时间 = "2021-03-05 12:34:05", ... }
  • { 优先级 = SyncPriority.Low, 创建时间 = "2021-03-05 12:34:03", ... }

我们可以判断事件每秒发生一次,从 HighNormalLow、... 优先级开始。当添加下一个 High 优先级时,我们可以将其插入到具有 Normal 优先级的第一个项目之前。数据结构的效率基于所有其他项目的顺序不变的事实。

如果我们将经过的时间(即不断增长的时间间隔)添加到混合中而不是创建时间,则必须针对下一个最高优先级项目的每个请求重新计算顺序。密钥基本上在时间上变为线性而不是常数。

为了避免这个难题及其固有的复杂性,您可以分而治之。允许使用窗口系统的几个简化之一。

最大优先级队列公开了以下成员:

  • 插入
  • 删除最大值

第一个简化示例,对优先级队列中的元素数量进行限制。

public class PriorityQueueWithMaxElements
{
    private readonly int _maxElementsInPriorityQueue;

    private readonly PriorityQueue<SynchronizationInfo> _priorityQueue;
    private readonly Queue<SynchronizationInfo> _backingQueue;

    public PriorityQueueWithMaxElements(int maxElementsInQueue)
    {
        _maxElementsInPriorityQueue = maxElementsInQueue;

        _priorityQueue = new PriorityQueue<SynchronizationInfo>();
        _backingQueue = new Queue<SynchronizationInfo>();
    }

    public void Insert(SynchronizationInfo info)
    {
        if (_backingQueue.Any() || _priorityQueue.Count == _maxElementsInPriorityQueue)
        {
            _backingQueue.Enqueue(info);
        }
        else
        {
            _priorityQueue.Insert(info);
        }
    }

    public SynchronizationInfo RemoveMax()
    {
        var max = _priorityQueue.RemoveMax();

        if (max == null && _backingQueue.Count > 0)
        {
            var numberOfItems = Math.Min(_maxElementsInPriorityQueue, _backingQueue.Count);
            for (var i = 0; i < numberOfItems; i++)
            {
                _priorityQueue.Insert(_backingQueue.Dequeue());
            }

            max = _priorityQueue.RemoveMax();
        }

        return max;
    }
}

第二个例子,对优先级队列中元素之间的最大时间延迟设置一个界限。

public class PriorityQueueWithMaxDelay
{
    private readonly TimeSpan _maxDelay;
    
    private readonly PriorityQueue<SynchronizationInfo> _priorityQueue;
    private readonly Queue<SynchronizationInfo> _backingQueue;

    // DateTime of oldest item in the priority-queue.
    private DateTime? _baseDateTime;

    public PriorityQueueWithMaxDelay(TimeSpan maxDelay)
    {
        _maxDelay = maxDelay;

        _priorityQueue = new PriorityQueue<SynchronizationInfo>();
        _backingQueue = new Queue<SynchronizationInfo>();
    }

    public void Insert(SynchronizationInfo info)
    {
        if (_baseDateTime == null)
        {
            _baseDateTime = info.Created;
        }

        if (_backingQueue.Any() || !IsWithinDelay(info))
        {
            _backingQueue.Enqueue(info);
        }
        else
        {
            _priorityQueue.Insert(info);
        }
    }

    public SynchronizationInfo RemoveMax()
    {
        var max = _priorityQueue.RemoveMax();

        if (max == null && _backingQueue.Count > 0)
        {
            _baseDateTime = _backingQueue.Peek().Created;

            var numberOfItems = _backingQueue.TakeWhile(IsWithinDelay).Count();
            for (var i = 0; i < numberOfItems; i++)
            {
                _priorityQueue.Insert(_backingQueue.Dequeue());
            }

            max = _priorityQueue.RemoveMax();
        }

        if (_priorityQueue.Count == 0)
        {
            _baseDateTime = null;
        }


        return max;
    }

    public bool IsWithinDelay(SynchronizationInfo info)
        => info.Created - _baseDateTime < _maxDelay;
}

请注意,一旦突破界限,优先级队列将在再次补充之前完全清空。这是为了避免低优先级项目无限期地保留在队列中。我将多线程支持留作 reader 的练习。可以说高性能的实现更容易实现,然后您将不得不重新计算每个请求的所有项目的顺序。

我们可以有更多队列,为什么要有一个队列?

这适用于任何恒定的优先级计数,尽管优先级足够低。 (据我所知,您对他们有一个 enum,因此可能只有几个优先级)。另外,我们不会使用优先级队列,而是使用几个普通队列。

  • 为每个优先级创建一个队列。任务根据其优先级注册到队列中。对于每个任务,都像@Funk 那样存储创建时间戳。
  • 当您希望处理下一个任务时,检查每个队列中可用元素的时间戳。
  • 这使您可以检测长期未完成的低优先级任务并提高它们的优先级。

可以通过多种方式提高优先级。例如:

  • 当队列足够长时,直接开始执行任务。 (例如 high_creation_time > medium_creation_time + C -> 运行 中优先级任务)
  • 将任务重新调度到优先级更高的队列,而不是直接运行ning。

哪种方式更适合你,很难说。

这种方法的复杂度:

  • 添加新任务:O(1) - 只需将其添加到各自的队列
  • 运行 任务:O(1) - 假设我们有固定数量的优先级,这只是检查所有队列并找到接下来应该 运行 的元素。
  • 重新安排任务(如果适用)- 一推一弹出,因此 O(1)