Interlocked CompareExchange 能否在此多线程循环实现中正确使用?

Can Interlocked CompareExchange be used correctly in this multithreaded round-robin implementation?

由于多线程上下文中的某些速率限制,我需要在 N 个不同连接之间循环调用某些调用。我决定使用一个列表和一个“计数器”来实现这个功能,它应该在每次调用的实例之间“跳一个”。

我将用一个最小的例子来说明这个概念(使用一个名为 A 的 class 来代表连接)

class A
{
    public A()
    {
        var newIndex = Interlocked.Increment(ref index);
        ID = newIndex.ToString();
    }
    private static int index;
    public string ID;
}

static int crt = 0;
static List<A> Items = Enumerable.Range(1, 15).Select(i => new A()).ToList();
static int itemsCount = Items.Count;

static A GetInstance()
{            
    var newIndex = Interlocked.Increment(ref crt);
    var instance = Items[newIndex % itemsCount];
    //Console.WriteLine($"{DateTime.Now.Ticks}, {Guid.NewGuid()}, Got instance: {instance.ID}");
    return instance;
}

static void Test()
{
    var sw = Stopwatch.StartNew();

    var tasks = Enumerable.Range(1, 1000000).Select(i => Task.Run(GetInstance)).ToArray();
    Task.WaitAll(tasks);
}

这按预期工作,因为它确保调用在连接之间循环。我可能会在“真实”代码中坚持这个实现(用 long 而不是 int 作为计数器)

然而,即使在我的用例中不太可能达到 int.MaxValue,我想知道是否有办法“安全地溢出”计数器。

我知道 C# 中的“%”是“余数”而不是“模数”,这意味着某些 ?: 体操需要始终 return 正数,我想避免这种情况。

所以我想得出的结果是:

static A GetInstance()
{            
    var newIndex = Interlocked.Increment(ref crt);
    Interlocked.CompareExchange(ref crt, 0, itemsCount); //?? the return value is the original value, how to know if it succeeded
    var instance = Items[newIndex];
    //Console.WriteLine($"{DateTime.Now.Ticks}, {Guid.NewGuid()}, Got instance: {instance.ID}");
    return instance;
}

我期望的是 Interlocked.CompareExchange(ref crt, 0, itemsCount) 将仅由一个线程“赢得”,一旦达到可用连接数就将计数器设置回 0。但是,我不知道如何在这种情况下使用它。

这里可以使用 CompareExchange 或 Interlocked 中的其他机制吗?

不,Interlocked class 没有提供允许您在 Int32 值溢出时将其恢复为零的机制。原因是两个线程有​​可能同时调用 var newIndex = Interlocked.Increment(ref crt); 语句,在这种情况下都溢出计数器,然后 none 将成功地将值更新回零。此功能刚好超出 Interlocked class 的功能。要使此类复杂操作成为原子操作,您需要使用其他一些同步机制,例如 lock.


更新: xanatos 的 proves that the above statement is wrong. It is also proven wrong by the answers of this 9-year old question. Below are two implementation of an InterlockedIncrementRoundRobin method. The first is a simplified version of this 回答,Alex Sorokoletov:

public static int InterlockedRoundRobinIncrement(ref int location, int modulo)
{
    // Arguments validation omitted (the modulo should be a positive number)
    uint current = unchecked((uint)Interlocked.Increment(ref location));
    return (int)(current % modulo);
}

这个实现非常有效,但它的缺点是支持 int 值不能直接使用,因为它遍历了 Int32 类型的整个范围(包括负值) .可用信息来自方法本身的 return 值,保证在 [0..modulo] 范围内。如果您想读取当前值而不增加它,您需要另一个类似的方法来执行相同的 int -> uint -> int 转换:

public static int InterlockedRoundRobinRead(ref int location, int modulo)
{
    uint current = unchecked((uint)Volatile.Read(ref location));
    return (int)(current % modulo);
}

它还有一个缺点,即每 4,294,967,296 递增一次,除非 modulo 是 2 的幂,否则它 return 在达到 modulo - 1值。换句话说,翻转逻辑在技术上存在缺陷。这可能是也可能不是大问题,具体取决于应用程序。

第二个实现是 xanatos 的修改版本 :

public static int InterlockedRoundRobinIncrement(ref int location, int modulo)
{
    // Arguments validation omitted (the modulo should be a positive number)
    while (true)
    {
        int current = Interlocked.Increment(ref location);
        if (current >= 0 && current < modulo) return current;

        // Overflow. Try to zero the number.
        while (true)
        {
            int current2 = Interlocked.CompareExchange(ref location, 0, current);
            if (current2 == current) return 0; // Success
            current = current2;
            if (current >= 0 && current < modulo)
            {
                break; // Another thread zeroed the number. Retry increment.
            }
        }
    }
}

这效率稍低(尤其是对于较小的 modulo 值),因为偶尔 Interlocked.Increment 操作会导致值超出范围,并且该值会被拒绝并重复操作。它确实有优势,尽管支持 int 值保持在 [0..modulo] 范围内,除了一些非常短的时间跨度,在此方法的某些调用期间。

你可能会:

static int crt = -1;
static readonly IReadOnlyList<A> Items = Enumerable.Range(1, 15).Select(i => new A()).ToList();
static readonly int itemsCount = Items.Count;
static readonly int maxItemCount = itemsCount * 100;

static A GetInstance()
{
    int newIndex;

    while (true)
    {
        newIndex = Interlocked.Increment(ref crt);

        if (newIndex >= itemsCount)
        {
            while (newIndex >= itemsCount && Interlocked.CompareExchange(ref crt, -1, newIndex) != newIndex)
            {
                // There is an implicit memory barrier caused by the Interlockd.CompareExchange around the
                // next line
                // See for example https://afana.me/archive/2015/07/10/memory-barriers-in-dot-net.aspx/
                // A full memory barrier is the strongest and interesting one. At least all of the following generate a full memory barrier implicitly:
                // Interlocked class mehods
                newIndex = crt;
            }

            continue;
        }

        break;
    }

    var instance = Items[newIndex % itemsCount];
    //Console.WriteLine($"{DateTime.Now.Ticks}, {Guid.NewGuid()}, Got instance: {instance.ID}");
    return instance;
}

但我不得不说实话...我不确定它是否正确(应该是),解释它很难,如果有人以任何方式触摸它,它就会坏掉。

基本思想是为 crt 设置一个“低”上限(我们不想溢出,它会破坏一切......所以我们想让 veeeeeery 远离 int.MaxValue, 或者你可以使用 uint).

最大可能值为:

maxItemCount = (int.MaxValue - MaximumNumberOfThreads) / itemsCount * itemsCount;

/ itemsCount * itemsCount 是因为我们希望轮次平均分配。在我给出的示例中,我使用的数字可能要低得多(itemsCount * 100),因为降低此上限只会导致更频繁地重置,但重置不会慢到真正重要的程度(这取决于什么你在线程上做。如果它们是非常小的线程,只使用 cpu 那么重置很慢,但如果不是那么它不是。

然后当我们溢出这个上限时,我们尝试将它移回 -1(我们的起点)。我们知道同时其他坏的坏线程可以 Interlocked.Increment 它并在这个重置上创建一个竞争。由于 Interlocked.CompareExchange 只有一个线程可以成功重置计数器,但其他线程会立即看到这一点并中断它们的尝试。

嗯... if可以重写为:

if (newIndex >= itemsCount)
{
    int newIndex2;
    while (newIndex >= itemsCount && (newIndex2 = Interlocked.CompareExchange(ref crt, 0, newIndex)) != newIndex)
    {
        // If the Interlocked.CompareExchange is successfull, the while will end and so we won't be here,
        // if it fails, newIndex2 is the current value of crt
        newIndex = newIndex2;
    }

    continue;
}

使用 CompareExchange 的替代方法是让值溢出。

我已经对此进行了测试并且无法证明它是错误的(到目前为止),但是当然这并不意味着它不是。

 //this incurs some cost, but "should" ensure that the int range
 // is mapped to the unit range (int.MinValue is mapped to 0 in the uint range)
 static ulong toPositive(int i) => (uint)1 + long.MaxValue + (uint)i;

 static A GetInstance()
 {
    //this seems to overflow safely without unchecked
    var newCounter = Interlocked.Increment(ref crt);
    //convert the counter to a list index, that is map the unsigned value
    //to a signed range and get the value modulus the itemCount value
    var newIndex = (int)(toPositive(newCounter) % (ulong)itemsCount);
    var instance = Items[newIndex];
    //Console.WriteLine($"{DateTime.Now.Ticks}, Got instance: {instance.ID}");
    return instance;
 }

PS: 我问题的xy问题的另一部分:在朋友的建议下,我目前正在研究使用LinkedList或类似的东西(带锁)来实现相同的目的。