Interlocked CompareExchange 能否在此多线程循环实现中正确使用?
Can Interlocked CompareExchange be used correctly in this multithreaded round-robin implementation?
由于多线程上下文中的某些速率限制,我需要在 N 个不同连接之间循环调用某些调用。我决定使用一个列表和一个“计数器”来实现这个功能,它应该在每次调用的实例之间“跳一个”。
我将用一个最小的例子来说明这个概念(使用一个名为 A 的 class 来代表连接)
class A
{
public A()
{
var newIndex = Interlocked.Increment(ref index);
ID = newIndex.ToString();
}
private static int index;
public string ID;
}
static int crt = 0;
static List<A> Items = Enumerable.Range(1, 15).Select(i => new A()).ToList();
static int itemsCount = Items.Count;
static A GetInstance()
{
var newIndex = Interlocked.Increment(ref crt);
var instance = Items[newIndex % itemsCount];
//Console.WriteLine($"{DateTime.Now.Ticks}, {Guid.NewGuid()}, Got instance: {instance.ID}");
return instance;
}
static void Test()
{
var sw = Stopwatch.StartNew();
var tasks = Enumerable.Range(1, 1000000).Select(i => Task.Run(GetInstance)).ToArray();
Task.WaitAll(tasks);
}
这按预期工作,因为它确保调用在连接之间循环。我可能会在“真实”代码中坚持这个实现(用 long 而不是 int 作为计数器)
然而,即使在我的用例中不太可能达到 int.MaxValue,我想知道是否有办法“安全地溢出”计数器。
我知道 C# 中的“%”是“余数”而不是“模数”,这意味着某些 ?: 体操需要始终 return 正数,我想避免这种情况。
所以我想得出的结果是:
static A GetInstance()
{
var newIndex = Interlocked.Increment(ref crt);
Interlocked.CompareExchange(ref crt, 0, itemsCount); //?? the return value is the original value, how to know if it succeeded
var instance = Items[newIndex];
//Console.WriteLine($"{DateTime.Now.Ticks}, {Guid.NewGuid()}, Got instance: {instance.ID}");
return instance;
}
我期望的是 Interlocked.CompareExchange(ref crt, 0, itemsCount)
将仅由一个线程“赢得”,一旦达到可用连接数就将计数器设置回 0。但是,我不知道如何在这种情况下使用它。
这里可以使用 CompareExchange 或 Interlocked 中的其他机制吗?
不,Interlocked
class 没有提供允许您在 Int32
值溢出时将其恢复为零的机制。原因是两个线程有可能同时调用 var newIndex = Interlocked.Increment(ref crt);
语句,在这种情况下都溢出计数器,然后 none 将成功地将值更新回零。此功能刚好超出 Interlocked
class 的功能。要使此类复杂操作成为原子操作,您需要使用其他一些同步机制,例如 lock
.
更新: xanatos 的 proves that the above statement is wrong. It is also proven wrong by the answers of this 9-year old question. Below are two implementation of an InterlockedIncrementRoundRobin
method. The first is a simplified version of this 回答,Alex Sorokoletov:
public static int InterlockedRoundRobinIncrement(ref int location, int modulo)
{
// Arguments validation omitted (the modulo should be a positive number)
uint current = unchecked((uint)Interlocked.Increment(ref location));
return (int)(current % modulo);
}
这个实现非常有效,但它的缺点是支持 int
值不能直接使用,因为它遍历了 Int32
类型的整个范围(包括负值) .可用信息来自方法本身的 return 值,保证在 [0..modulo]
范围内。如果您想读取当前值而不增加它,您需要另一个类似的方法来执行相同的 int -> uint -> int
转换:
public static int InterlockedRoundRobinRead(ref int location, int modulo)
{
uint current = unchecked((uint)Volatile.Read(ref location));
return (int)(current % modulo);
}
它还有一个缺点,即每 4,294,967,296 递增一次,除非 modulo
是 2 的幂,否则它 return 在达到 modulo - 1
值。换句话说,翻转逻辑在技术上存在缺陷。这可能是也可能不是大问题,具体取决于应用程序。
第二个实现是 xanatos 的修改版本 :
public static int InterlockedRoundRobinIncrement(ref int location, int modulo)
{
// Arguments validation omitted (the modulo should be a positive number)
while (true)
{
int current = Interlocked.Increment(ref location);
if (current >= 0 && current < modulo) return current;
// Overflow. Try to zero the number.
while (true)
{
int current2 = Interlocked.CompareExchange(ref location, 0, current);
if (current2 == current) return 0; // Success
current = current2;
if (current >= 0 && current < modulo)
{
break; // Another thread zeroed the number. Retry increment.
}
}
}
}
这效率稍低(尤其是对于较小的 modulo
值),因为偶尔 Interlocked.Increment
操作会导致值超出范围,并且该值会被拒绝并重复操作。它确实有优势,尽管支持 int
值保持在 [0..modulo]
范围内,除了一些非常短的时间跨度,在此方法的某些调用期间。
你可能会:
static int crt = -1;
static readonly IReadOnlyList<A> Items = Enumerable.Range(1, 15).Select(i => new A()).ToList();
static readonly int itemsCount = Items.Count;
static readonly int maxItemCount = itemsCount * 100;
static A GetInstance()
{
int newIndex;
while (true)
{
newIndex = Interlocked.Increment(ref crt);
if (newIndex >= itemsCount)
{
while (newIndex >= itemsCount && Interlocked.CompareExchange(ref crt, -1, newIndex) != newIndex)
{
// There is an implicit memory barrier caused by the Interlockd.CompareExchange around the
// next line
// See for example https://afana.me/archive/2015/07/10/memory-barriers-in-dot-net.aspx/
// A full memory barrier is the strongest and interesting one. At least all of the following generate a full memory barrier implicitly:
// Interlocked class mehods
newIndex = crt;
}
continue;
}
break;
}
var instance = Items[newIndex % itemsCount];
//Console.WriteLine($"{DateTime.Now.Ticks}, {Guid.NewGuid()}, Got instance: {instance.ID}");
return instance;
}
但我不得不说实话...我不确定它是否正确(应该是),解释它很难,如果有人以任何方式触摸它,它就会坏掉。
基本思想是为 crt
设置一个“低”上限(我们不想溢出,它会破坏一切......所以我们想让 veeeeeery 远离 int.MaxValue
, 或者你可以使用 uint
).
最大可能值为:
maxItemCount = (int.MaxValue - MaximumNumberOfThreads) / itemsCount * itemsCount;
/ itemsCount * itemsCount
是因为我们希望轮次平均分配。在我给出的示例中,我使用的数字可能要低得多(itemsCount * 100
),因为降低此上限只会导致更频繁地重置,但重置不会慢到真正重要的程度(这取决于什么你在线程上做。如果它们是非常小的线程,只使用 cpu 那么重置很慢,但如果不是那么它不是。
然后当我们溢出这个上限时,我们尝试将它移回 -1
(我们的起点)。我们知道同时其他坏的坏线程可以 Interlocked.Increment
它并在这个重置上创建一个竞争。由于 Interlocked.CompareExchange
只有一个线程可以成功重置计数器,但其他线程会立即看到这一点并中断它们的尝试。
嗯... if
可以重写为:
if (newIndex >= itemsCount)
{
int newIndex2;
while (newIndex >= itemsCount && (newIndex2 = Interlocked.CompareExchange(ref crt, 0, newIndex)) != newIndex)
{
// If the Interlocked.CompareExchange is successfull, the while will end and so we won't be here,
// if it fails, newIndex2 is the current value of crt
newIndex = newIndex2;
}
continue;
}
使用 CompareExchange 的替代方法是让值溢出。
我已经对此进行了测试并且无法证明它是错误的(到目前为止),但是当然这并不意味着它不是。
//this incurs some cost, but "should" ensure that the int range
// is mapped to the unit range (int.MinValue is mapped to 0 in the uint range)
static ulong toPositive(int i) => (uint)1 + long.MaxValue + (uint)i;
static A GetInstance()
{
//this seems to overflow safely without unchecked
var newCounter = Interlocked.Increment(ref crt);
//convert the counter to a list index, that is map the unsigned value
//to a signed range and get the value modulus the itemCount value
var newIndex = (int)(toPositive(newCounter) % (ulong)itemsCount);
var instance = Items[newIndex];
//Console.WriteLine($"{DateTime.Now.Ticks}, Got instance: {instance.ID}");
return instance;
}
PS: 我问题的xy问题的另一部分:在朋友的建议下,我目前正在研究使用LinkedList或类似的东西(带锁)来实现相同的目的。
由于多线程上下文中的某些速率限制,我需要在 N 个不同连接之间循环调用某些调用。我决定使用一个列表和一个“计数器”来实现这个功能,它应该在每次调用的实例之间“跳一个”。
我将用一个最小的例子来说明这个概念(使用一个名为 A 的 class 来代表连接)
class A
{
public A()
{
var newIndex = Interlocked.Increment(ref index);
ID = newIndex.ToString();
}
private static int index;
public string ID;
}
static int crt = 0;
static List<A> Items = Enumerable.Range(1, 15).Select(i => new A()).ToList();
static int itemsCount = Items.Count;
static A GetInstance()
{
var newIndex = Interlocked.Increment(ref crt);
var instance = Items[newIndex % itemsCount];
//Console.WriteLine($"{DateTime.Now.Ticks}, {Guid.NewGuid()}, Got instance: {instance.ID}");
return instance;
}
static void Test()
{
var sw = Stopwatch.StartNew();
var tasks = Enumerable.Range(1, 1000000).Select(i => Task.Run(GetInstance)).ToArray();
Task.WaitAll(tasks);
}
这按预期工作,因为它确保调用在连接之间循环。我可能会在“真实”代码中坚持这个实现(用 long 而不是 int 作为计数器)
然而,即使在我的用例中不太可能达到 int.MaxValue,我想知道是否有办法“安全地溢出”计数器。
我知道 C# 中的“%”是“余数”而不是“模数”,这意味着某些 ?: 体操需要始终 return 正数,我想避免这种情况。
所以我想得出的结果是:
static A GetInstance()
{
var newIndex = Interlocked.Increment(ref crt);
Interlocked.CompareExchange(ref crt, 0, itemsCount); //?? the return value is the original value, how to know if it succeeded
var instance = Items[newIndex];
//Console.WriteLine($"{DateTime.Now.Ticks}, {Guid.NewGuid()}, Got instance: {instance.ID}");
return instance;
}
我期望的是 Interlocked.CompareExchange(ref crt, 0, itemsCount)
将仅由一个线程“赢得”,一旦达到可用连接数就将计数器设置回 0。但是,我不知道如何在这种情况下使用它。
这里可以使用 CompareExchange 或 Interlocked 中的其他机制吗?
不,Interlocked
class 没有提供允许您在 Int32
值溢出时将其恢复为零的机制。原因是两个线程有可能同时调用 var newIndex = Interlocked.Increment(ref crt);
语句,在这种情况下都溢出计数器,然后 none 将成功地将值更新回零。此功能刚好超出 Interlocked
class 的功能。要使此类复杂操作成为原子操作,您需要使用其他一些同步机制,例如 lock
.
更新: xanatos 的 InterlockedIncrementRoundRobin
method. The first is a simplified version of this 回答,Alex Sorokoletov:
public static int InterlockedRoundRobinIncrement(ref int location, int modulo)
{
// Arguments validation omitted (the modulo should be a positive number)
uint current = unchecked((uint)Interlocked.Increment(ref location));
return (int)(current % modulo);
}
这个实现非常有效,但它的缺点是支持 int
值不能直接使用,因为它遍历了 Int32
类型的整个范围(包括负值) .可用信息来自方法本身的 return 值,保证在 [0..modulo]
范围内。如果您想读取当前值而不增加它,您需要另一个类似的方法来执行相同的 int -> uint -> int
转换:
public static int InterlockedRoundRobinRead(ref int location, int modulo)
{
uint current = unchecked((uint)Volatile.Read(ref location));
return (int)(current % modulo);
}
它还有一个缺点,即每 4,294,967,296 递增一次,除非 modulo
是 2 的幂,否则它 return 在达到 modulo - 1
值。换句话说,翻转逻辑在技术上存在缺陷。这可能是也可能不是大问题,具体取决于应用程序。
第二个实现是 xanatos 的修改版本
public static int InterlockedRoundRobinIncrement(ref int location, int modulo)
{
// Arguments validation omitted (the modulo should be a positive number)
while (true)
{
int current = Interlocked.Increment(ref location);
if (current >= 0 && current < modulo) return current;
// Overflow. Try to zero the number.
while (true)
{
int current2 = Interlocked.CompareExchange(ref location, 0, current);
if (current2 == current) return 0; // Success
current = current2;
if (current >= 0 && current < modulo)
{
break; // Another thread zeroed the number. Retry increment.
}
}
}
}
这效率稍低(尤其是对于较小的 modulo
值),因为偶尔 Interlocked.Increment
操作会导致值超出范围,并且该值会被拒绝并重复操作。它确实有优势,尽管支持 int
值保持在 [0..modulo]
范围内,除了一些非常短的时间跨度,在此方法的某些调用期间。
你可能会:
static int crt = -1;
static readonly IReadOnlyList<A> Items = Enumerable.Range(1, 15).Select(i => new A()).ToList();
static readonly int itemsCount = Items.Count;
static readonly int maxItemCount = itemsCount * 100;
static A GetInstance()
{
int newIndex;
while (true)
{
newIndex = Interlocked.Increment(ref crt);
if (newIndex >= itemsCount)
{
while (newIndex >= itemsCount && Interlocked.CompareExchange(ref crt, -1, newIndex) != newIndex)
{
// There is an implicit memory barrier caused by the Interlockd.CompareExchange around the
// next line
// See for example https://afana.me/archive/2015/07/10/memory-barriers-in-dot-net.aspx/
// A full memory barrier is the strongest and interesting one. At least all of the following generate a full memory barrier implicitly:
// Interlocked class mehods
newIndex = crt;
}
continue;
}
break;
}
var instance = Items[newIndex % itemsCount];
//Console.WriteLine($"{DateTime.Now.Ticks}, {Guid.NewGuid()}, Got instance: {instance.ID}");
return instance;
}
但我不得不说实话...我不确定它是否正确(应该是),解释它很难,如果有人以任何方式触摸它,它就会坏掉。
基本思想是为 crt
设置一个“低”上限(我们不想溢出,它会破坏一切......所以我们想让 veeeeeery 远离 int.MaxValue
, 或者你可以使用 uint
).
最大可能值为:
maxItemCount = (int.MaxValue - MaximumNumberOfThreads) / itemsCount * itemsCount;
/ itemsCount * itemsCount
是因为我们希望轮次平均分配。在我给出的示例中,我使用的数字可能要低得多(itemsCount * 100
),因为降低此上限只会导致更频繁地重置,但重置不会慢到真正重要的程度(这取决于什么你在线程上做。如果它们是非常小的线程,只使用 cpu 那么重置很慢,但如果不是那么它不是。
然后当我们溢出这个上限时,我们尝试将它移回 -1
(我们的起点)。我们知道同时其他坏的坏线程可以 Interlocked.Increment
它并在这个重置上创建一个竞争。由于 Interlocked.CompareExchange
只有一个线程可以成功重置计数器,但其他线程会立即看到这一点并中断它们的尝试。
嗯... if
可以重写为:
if (newIndex >= itemsCount)
{
int newIndex2;
while (newIndex >= itemsCount && (newIndex2 = Interlocked.CompareExchange(ref crt, 0, newIndex)) != newIndex)
{
// If the Interlocked.CompareExchange is successfull, the while will end and so we won't be here,
// if it fails, newIndex2 is the current value of crt
newIndex = newIndex2;
}
continue;
}
使用 CompareExchange 的替代方法是让值溢出。
我已经对此进行了测试并且无法证明它是错误的(到目前为止),但是当然这并不意味着它不是。
//this incurs some cost, but "should" ensure that the int range
// is mapped to the unit range (int.MinValue is mapped to 0 in the uint range)
static ulong toPositive(int i) => (uint)1 + long.MaxValue + (uint)i;
static A GetInstance()
{
//this seems to overflow safely without unchecked
var newCounter = Interlocked.Increment(ref crt);
//convert the counter to a list index, that is map the unsigned value
//to a signed range and get the value modulus the itemCount value
var newIndex = (int)(toPositive(newCounter) % (ulong)itemsCount);
var instance = Items[newIndex];
//Console.WriteLine($"{DateTime.Now.Ticks}, Got instance: {instance.ID}");
return instance;
}
PS: 我问题的xy问题的另一部分:在朋友的建议下,我目前正在研究使用LinkedList或类似的东西(带锁)来实现相同的目的。