如何实现序列的约束改组
How to implement constrained shuffling of a sequence
我需要模拟多线程场景的输出,其中多个线程并行处理有序序列。输出不再排序,但也没有完全洗牌。我认为实施这样的洗牌应该是微不足道的,不会超过 10-20 分钟。但它证明比我更棘手。所以现在经过许多小时的努力解决这个问题,并一路完善需求,我已经设法产生了一个具有非最佳统计行为的复杂实现。让我们先说明要求:
1) 该方法应该 return 延迟 IEnumerable
,以便可以打乱无限长度的序列。
2)每个单独元素的随机位移应该有一个硬性上限。
3)位移分布应近似平坦。例如,用 maxDisplacement = 2
打乱的 100 个元素的序列应该有 ~20 个元素被 -2 置换,~20 个元素被 -1 置换,~20 个元素没有置换,~20 个元素被 +1 置换,~20 个元素被 +2 置换。
4)洗牌应该是随机的。该方法的不同调用通常 return 一个不同的随机顺序。
输入输出示例。 20 个元素的序列用 maxDisplacement = 5
.
打乱顺序
Input: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
Output: 0, 3, 2, 5, 7, 1, 4, 6, 8, 12, 9, 11, 13, 10, 15, 16, 19, 14, 17, 18
以下是我迄今为止的最佳尝试:
public static IEnumerable<TSource> ConstrainedShuffle<TSource>(
this IEnumerable<TSource> source, Random random, int maxDisplacement)
{
if (maxDisplacement < 1)
throw new ArgumentOutOfRangeException(nameof(maxDisplacement));
random = random ?? new Random();
var buffer = new SortedDictionary<int, TSource>();
IEnumerable<(int Index, int BufferMaxIndex)> EnumerateInternal()
{
int index = -1;
int bufferMaxIndex = -1;
foreach (var item in source)
{
bufferMaxIndex++;
buffer.Add(bufferMaxIndex, item);
if (bufferMaxIndex >= maxDisplacement)
{
// Start yielding when buffer has maxDisplacement + 1 elements
index++;
yield return (index, bufferMaxIndex);
}
}
while (buffer.Count > 0) // Yield what is left in the buffer
{
while (!buffer.ContainsKey(bufferMaxIndex)) bufferMaxIndex--;
index++;
yield return (index, bufferMaxIndex);
}
}
foreach (var (index, bufferMaxIndex) in EnumerateInternal())
{
int bufferMinIndex = buffer.First().Key;
int selectedKey;
if (index - bufferMinIndex >= maxDisplacement)
{
// Forced picking of the earliest element
selectedKey = bufferMinIndex;
}
else
{
// Pick an element randomly (favoring earlier elements)
int bufferRange = bufferMaxIndex - bufferMinIndex + 1;
while (true)
{
var biasedRandom = Math.Pow(random.NextDouble(), 2.0);
var randomIndex = (int)(biasedRandom * bufferRange);
selectedKey = bufferMinIndex + randomIndex;
if (buffer.ContainsKey(selectedKey)) break;
}
}
yield return buffer[selectedKey];
buffer.Remove(selectedKey);
}
}
此实现不符合第三个要求。位移分布是一条奇怪的曲线,在最大正位移处有一个峰值(对于 maxDisplacement
的大值被大大夸大了)。这是 1,000,000 个元素序列的分布,用 maxDisplacement = 10
:
打乱
-10: 44,188
-9: 44,199
-8: 43,701
-7: 43,360
-6: 43,134
-5: 43,112
-4: 42,870
-3: 43,628
-2: 44,170
-1: 45,479
0: 50,029
+1: 58,611
+2: 67,077
+3: 71,663
+4: 70,175
+5: 62,914
+6: 52,835
+7: 40,974
+8: 30,553
+9: 21,210
+10: 36,118
Negative/Positive displacements: 437,841 / 512,130
我可能缺少解决此问题的更简单方法。
更新: 我实现了一个基于 Jim Mischel's 的解决方案,效果很好!洗牌关于正负位移是对称的,在洗牌块连接的点处没有可见的接缝,并且位移分布几乎是平坦的(较小的位移略有优势,但我同意)。速度也很快。
public static IEnumerable<TSource> ConstrainedShuffle_Probabilistic<TSource>(
this IEnumerable<TSource> source, Random random, int maxDisplacement)
{
if (maxDisplacement < 1)
throw new ArgumentOutOfRangeException(nameof(maxDisplacement));
random = random ?? new Random();
int chunkSize = Math.Max(100, maxDisplacement);
int seamSize = maxDisplacement;
int chunkSizePlus = chunkSize + 2 * seamSize;
var indexes = new List<int>(chunkSizePlus);
var chunk = new List<TSource>(chunkSizePlus + seamSize);
int chunkOffset = 0;
int indexesOffset = 0;
bool firstShuffle = true;
int index = -1;
foreach (var item in source)
{
index++;
chunk.Add(item);
indexes.Add(index);
if (indexes.Count >= chunkSizePlus)
{
if (firstShuffle)
{
ShuffleIndexes(0, indexes.Count - seamSize);
}
else
{
ShuffleIndexes(seamSize, seamSize + chunkSize);
}
for (int i = 0; i < chunkSize; i++)
{
yield return chunk[indexes[i] - chunkOffset];
}
if (!firstShuffle)
{
chunk.RemoveRange(0, chunkSize);
chunkOffset += chunkSize;
}
indexes.RemoveRange(0, chunkSize);
indexesOffset += chunkSize;
firstShuffle = false;
}
}
if (firstShuffle)
{
ShuffleIndexes(0, indexes.Count);
}
else
{
ShuffleIndexes(seamSize, indexes.Count);
}
for (int i = 0; i < indexes.Count; i++)
{
yield return chunk[indexes[i] - chunkOffset];
}
void ShuffleIndexes(int suffleFrom, int suffleToExclusive)
{
var range = Enumerable
.Range(suffleFrom, suffleToExclusive - suffleFrom).ToList();
Shuffle(range);
foreach (var i in range)
{
int index1 = indexes[i];
int randomFrom = Math.Max(0, index1 - indexesOffset - maxDisplacement);
int randomToExclusive = Math.Min(indexes.Count,
index1 - indexesOffset + maxDisplacement + 1);
int selectedIndex;
int collisions = 0;
while (true)
{
selectedIndex = random.Next(randomFrom, randomToExclusive);
int index2 = indexes[selectedIndex];
if (Math.Abs(i + indexesOffset - index2) <= maxDisplacement) break;
collisions++;
if (collisions >= 20) // Average collisions is < 1
{
selectedIndex = -1;
break;
}
}
if (selectedIndex != i && selectedIndex != -1)
{
var temp = indexes[i];
indexes[i] = indexes[selectedIndex];
indexes[selectedIndex] = temp;
}
}
}
void Shuffle(List<int> list)
{
for (int i = 0; i < list.Count; i++)
{
int j = random.Next(i, list.Count);
if (i == j) continue;
var temp = list[i];
list[i] = list[j];
list[j] = temp;
}
}
}
位移分布样本。 1,000,000 个元素的序列用 maxDisplacement = 1000
打乱顺序,然后将位移分组并显示平均出现次数:
[-1000..-801]: 443
[-800..-601]: 466
[-600..-401]: 496
[-400..-201]: 525
[-200..-1]: 553
[0]: 542
[+1..+200]: 563
[+201..+400]: 546
[+401..+600]: 514
[+601..+800]: 475
[+801..+1000]: 418
执行时间:450 毫秒
我有一个想法应该适用于有限数组。
给定最大位移 2:
- 索引 0 可以移动到索引 1 或 2
- 索引 1 可以移动到索引 0、2 或 3
- 索引 2 可以移动到索引 0、1、3 或 4
- 等等
- 索引 8 可以移动到 6、7 或 9
- 索引 9 可以移动到 7 或 8
这是我的想法。让我们使用一个包含 10 个项目的数组:
working = [0,1,2,3,4,5,6,7,8,9]
available = [0,1,2,3,4,5,6,7,8,9] // make a copy of the initial array
avail_count = 10
现在执行以下操作直到 avail_count < 2:
- Select
available
数组中的随机项。
- Select 一个介于 -2 和 +2 之间的随机数(包括 0、1、8 和 9 的特殊情况,您的范围有限)。
- 将您的偏移量添加到您 select 编辑的数字中。这为您提供了一个索引,您将使用该索引交换您在第 1 步中 select 编辑的项目。(这并不总是有效,请参见下文。)
- 交换
working
数组中的这两项。
- 在
available
数组中,通过替换为最后一项并减少计数来删除两个交换的项目。
让我举个例子。
- 在 0 到 9 之间随机选择一个数,包括 0 和 9,然后从
available
数组中拉出该项目。假设随机数是 5。available[5]
处的项目是 5。
- 选择一个随机偏移量。假设您选择了 -2。
- 将 -2 添加到 5,得到 3:要交换的索引。
- 交换这两项,结果是:
working = [0,1,2,5,4,3,6,7,8,9]
第 5 步,从 available
数组中删除 3 和 5 并相应减少计数:
available = [0,1,2,3,4,9,6,7,8] count = 9
available = [0,1,2,8,4,9,6,7] count = 8
下一次迭代将说明我在步骤 #3 中提到的问题。
- 在 0 到 7 之间随机选择一个数字,包括 0 和 7。假设我们选择了 2。那里的项目是 2。
- 选择一个随机偏移量。假设我们选择了 1.
- 1 加 2,得到 3。现在我们有问题了。
working[3]
处的项目是 5。我们不能将 2 与 5 交换,因为这样做会导致位移 3,这比您声明的 max-displacement. 高
我可以想到两种方法来解决这个问题。第一个很容易。如果 working[index]
处的项目不等于 index
,则假设您不能交换:将其视为随机偏移量为 0。只需从 [=12= 中删除第一个索引]数组,减少计数,继续
另一种方法是构建一个包含 -max_displacement..+max_displacement
范围内所有符合条件的项目的数组,并随机 select 一个。这有 O(max_displacement*2) 的缺点,但会起作用。
无论如何,如果您继续这样做直到 count < 2
,那么您将对数组进行洗牌,保持您的置换规则。这是否会给你你想要的位移分布是另一个问题。我必须编写代码并试一试才能确定。
现在,让它在流上运行?我的第一个尝试是在 large-ish 块中进行。必须多考虑一下。
想法:预先计算大小为 n 的缓冲区的所有可能选项?例如对于 -1,0,+1 和缓冲区 3 你得到 [0,1,2],[0,2,1],[1,0,2],[1,0,3 carry 2] 假设没有从前发扬光大。所以...
[0,1,2] has a total shift of 0
[0,2,1] has a total shift of 2
[1,0,2] has a total shift of 2
[1,0,3] carry 2 has a total shift of 3
对有结转的情况做同样的事情(开始时有两种状态,一种没有结转,一种有结转,结转必须落在此简化案例中的第一个单元格)。
现在您可以为每个模式分配概率以满足平坦分布,并可以相应地随机选择一个。这将输出所有 N 个下一个值,然后您取进位并重新开始。
很明显,对于大于 -1,0,1 的东西,您将有更多的可能性,并且您也可能有更多的项目可以结转。
现在,你能简化一下吗?也许使用相对偏移量将选择放在有向图中?重复时将图形循环回来。为每个分支分配概率以获得平坦分布。也许把它变成一个有限状态机。
奖励积分:创建一个有限状态机来实现算法但不知道转换的概率。现在使用机器学习来训练它,通过为转换分配概率来获得平坦分布。
我本来会发表评论的,因为它不是直接的答案,但它变得太长了;
我需要模拟多线程场景的输出,其中多个线程并行处理有序序列。输出不再排序,但也没有完全洗牌。我认为实施这样的洗牌应该是微不足道的,不会超过 10-20 分钟。但它证明比我更棘手。所以现在经过许多小时的努力解决这个问题,并一路完善需求,我已经设法产生了一个具有非最佳统计行为的复杂实现。让我们先说明要求:
1) 该方法应该 return 延迟 IEnumerable
,以便可以打乱无限长度的序列。
2)每个单独元素的随机位移应该有一个硬性上限。
3)位移分布应近似平坦。例如,用 maxDisplacement = 2
打乱的 100 个元素的序列应该有 ~20 个元素被 -2 置换,~20 个元素被 -1 置换,~20 个元素没有置换,~20 个元素被 +1 置换,~20 个元素被 +2 置换。
4)洗牌应该是随机的。该方法的不同调用通常 return 一个不同的随机顺序。
输入输出示例。 20 个元素的序列用 maxDisplacement = 5
.
Input: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
Output: 0, 3, 2, 5, 7, 1, 4, 6, 8, 12, 9, 11, 13, 10, 15, 16, 19, 14, 17, 18
以下是我迄今为止的最佳尝试:
public static IEnumerable<TSource> ConstrainedShuffle<TSource>(
this IEnumerable<TSource> source, Random random, int maxDisplacement)
{
if (maxDisplacement < 1)
throw new ArgumentOutOfRangeException(nameof(maxDisplacement));
random = random ?? new Random();
var buffer = new SortedDictionary<int, TSource>();
IEnumerable<(int Index, int BufferMaxIndex)> EnumerateInternal()
{
int index = -1;
int bufferMaxIndex = -1;
foreach (var item in source)
{
bufferMaxIndex++;
buffer.Add(bufferMaxIndex, item);
if (bufferMaxIndex >= maxDisplacement)
{
// Start yielding when buffer has maxDisplacement + 1 elements
index++;
yield return (index, bufferMaxIndex);
}
}
while (buffer.Count > 0) // Yield what is left in the buffer
{
while (!buffer.ContainsKey(bufferMaxIndex)) bufferMaxIndex--;
index++;
yield return (index, bufferMaxIndex);
}
}
foreach (var (index, bufferMaxIndex) in EnumerateInternal())
{
int bufferMinIndex = buffer.First().Key;
int selectedKey;
if (index - bufferMinIndex >= maxDisplacement)
{
// Forced picking of the earliest element
selectedKey = bufferMinIndex;
}
else
{
// Pick an element randomly (favoring earlier elements)
int bufferRange = bufferMaxIndex - bufferMinIndex + 1;
while (true)
{
var biasedRandom = Math.Pow(random.NextDouble(), 2.0);
var randomIndex = (int)(biasedRandom * bufferRange);
selectedKey = bufferMinIndex + randomIndex;
if (buffer.ContainsKey(selectedKey)) break;
}
}
yield return buffer[selectedKey];
buffer.Remove(selectedKey);
}
}
此实现不符合第三个要求。位移分布是一条奇怪的曲线,在最大正位移处有一个峰值(对于 maxDisplacement
的大值被大大夸大了)。这是 1,000,000 个元素序列的分布,用 maxDisplacement = 10
:
-10: 44,188
-9: 44,199
-8: 43,701
-7: 43,360
-6: 43,134
-5: 43,112
-4: 42,870
-3: 43,628
-2: 44,170
-1: 45,479
0: 50,029
+1: 58,611
+2: 67,077
+3: 71,663
+4: 70,175
+5: 62,914
+6: 52,835
+7: 40,974
+8: 30,553
+9: 21,210
+10: 36,118
Negative/Positive displacements: 437,841 / 512,130
我可能缺少解决此问题的更简单方法。
更新: 我实现了一个基于 Jim Mischel's
public static IEnumerable<TSource> ConstrainedShuffle_Probabilistic<TSource>(
this IEnumerable<TSource> source, Random random, int maxDisplacement)
{
if (maxDisplacement < 1)
throw new ArgumentOutOfRangeException(nameof(maxDisplacement));
random = random ?? new Random();
int chunkSize = Math.Max(100, maxDisplacement);
int seamSize = maxDisplacement;
int chunkSizePlus = chunkSize + 2 * seamSize;
var indexes = new List<int>(chunkSizePlus);
var chunk = new List<TSource>(chunkSizePlus + seamSize);
int chunkOffset = 0;
int indexesOffset = 0;
bool firstShuffle = true;
int index = -1;
foreach (var item in source)
{
index++;
chunk.Add(item);
indexes.Add(index);
if (indexes.Count >= chunkSizePlus)
{
if (firstShuffle)
{
ShuffleIndexes(0, indexes.Count - seamSize);
}
else
{
ShuffleIndexes(seamSize, seamSize + chunkSize);
}
for (int i = 0; i < chunkSize; i++)
{
yield return chunk[indexes[i] - chunkOffset];
}
if (!firstShuffle)
{
chunk.RemoveRange(0, chunkSize);
chunkOffset += chunkSize;
}
indexes.RemoveRange(0, chunkSize);
indexesOffset += chunkSize;
firstShuffle = false;
}
}
if (firstShuffle)
{
ShuffleIndexes(0, indexes.Count);
}
else
{
ShuffleIndexes(seamSize, indexes.Count);
}
for (int i = 0; i < indexes.Count; i++)
{
yield return chunk[indexes[i] - chunkOffset];
}
void ShuffleIndexes(int suffleFrom, int suffleToExclusive)
{
var range = Enumerable
.Range(suffleFrom, suffleToExclusive - suffleFrom).ToList();
Shuffle(range);
foreach (var i in range)
{
int index1 = indexes[i];
int randomFrom = Math.Max(0, index1 - indexesOffset - maxDisplacement);
int randomToExclusive = Math.Min(indexes.Count,
index1 - indexesOffset + maxDisplacement + 1);
int selectedIndex;
int collisions = 0;
while (true)
{
selectedIndex = random.Next(randomFrom, randomToExclusive);
int index2 = indexes[selectedIndex];
if (Math.Abs(i + indexesOffset - index2) <= maxDisplacement) break;
collisions++;
if (collisions >= 20) // Average collisions is < 1
{
selectedIndex = -1;
break;
}
}
if (selectedIndex != i && selectedIndex != -1)
{
var temp = indexes[i];
indexes[i] = indexes[selectedIndex];
indexes[selectedIndex] = temp;
}
}
}
void Shuffle(List<int> list)
{
for (int i = 0; i < list.Count; i++)
{
int j = random.Next(i, list.Count);
if (i == j) continue;
var temp = list[i];
list[i] = list[j];
list[j] = temp;
}
}
}
位移分布样本。 1,000,000 个元素的序列用 maxDisplacement = 1000
打乱顺序,然后将位移分组并显示平均出现次数:
[-1000..-801]: 443
[-800..-601]: 466
[-600..-401]: 496
[-400..-201]: 525
[-200..-1]: 553
[0]: 542
[+1..+200]: 563
[+201..+400]: 546
[+401..+600]: 514
[+601..+800]: 475
[+801..+1000]: 418
执行时间:450 毫秒
我有一个想法应该适用于有限数组。
给定最大位移 2:
- 索引 0 可以移动到索引 1 或 2
- 索引 1 可以移动到索引 0、2 或 3
- 索引 2 可以移动到索引 0、1、3 或 4
- 等等
- 索引 8 可以移动到 6、7 或 9
- 索引 9 可以移动到 7 或 8
这是我的想法。让我们使用一个包含 10 个项目的数组:
working = [0,1,2,3,4,5,6,7,8,9]
available = [0,1,2,3,4,5,6,7,8,9] // make a copy of the initial array
avail_count = 10
现在执行以下操作直到 avail_count < 2:
- Select
available
数组中的随机项。 - Select 一个介于 -2 和 +2 之间的随机数(包括 0、1、8 和 9 的特殊情况,您的范围有限)。
- 将您的偏移量添加到您 select 编辑的数字中。这为您提供了一个索引,您将使用该索引交换您在第 1 步中 select 编辑的项目。(这并不总是有效,请参见下文。)
- 交换
working
数组中的这两项。 - 在
available
数组中,通过替换为最后一项并减少计数来删除两个交换的项目。
让我举个例子。
- 在 0 到 9 之间随机选择一个数,包括 0 和 9,然后从
available
数组中拉出该项目。假设随机数是 5。available[5]
处的项目是 5。 - 选择一个随机偏移量。假设您选择了 -2。
- 将 -2 添加到 5,得到 3:要交换的索引。
- 交换这两项,结果是:
working = [0,1,2,5,4,3,6,7,8,9]
第 5 步,从 available
数组中删除 3 和 5 并相应减少计数:
available = [0,1,2,3,4,9,6,7,8] count = 9
available = [0,1,2,8,4,9,6,7] count = 8
下一次迭代将说明我在步骤 #3 中提到的问题。
- 在 0 到 7 之间随机选择一个数字,包括 0 和 7。假设我们选择了 2。那里的项目是 2。
- 选择一个随机偏移量。假设我们选择了 1.
- 1 加 2,得到 3。现在我们有问题了。
working[3]
处的项目是 5。我们不能将 2 与 5 交换,因为这样做会导致位移 3,这比您声明的 max-displacement. 高
我可以想到两种方法来解决这个问题。第一个很容易。如果 working[index]
处的项目不等于 index
,则假设您不能交换:将其视为随机偏移量为 0。只需从 [=12= 中删除第一个索引]数组,减少计数,继续
另一种方法是构建一个包含 -max_displacement..+max_displacement
范围内所有符合条件的项目的数组,并随机 select 一个。这有 O(max_displacement*2) 的缺点,但会起作用。
无论如何,如果您继续这样做直到 count < 2
,那么您将对数组进行洗牌,保持您的置换规则。这是否会给你你想要的位移分布是另一个问题。我必须编写代码并试一试才能确定。
现在,让它在流上运行?我的第一个尝试是在 large-ish 块中进行。必须多考虑一下。
想法:预先计算大小为 n 的缓冲区的所有可能选项?例如对于 -1,0,+1 和缓冲区 3 你得到 [0,1,2],[0,2,1],[1,0,2],[1,0,3 carry 2] 假设没有从前发扬光大。所以...
[0,1,2] has a total shift of 0
[0,2,1] has a total shift of 2
[1,0,2] has a total shift of 2
[1,0,3] carry 2 has a total shift of 3
对有结转的情况做同样的事情(开始时有两种状态,一种没有结转,一种有结转,结转必须落在此简化案例中的第一个单元格)。
现在您可以为每个模式分配概率以满足平坦分布,并可以相应地随机选择一个。这将输出所有 N 个下一个值,然后您取进位并重新开始。
很明显,对于大于 -1,0,1 的东西,您将有更多的可能性,并且您也可能有更多的项目可以结转。
现在,你能简化一下吗?也许使用相对偏移量将选择放在有向图中?重复时将图形循环回来。为每个分支分配概率以获得平坦分布。也许把它变成一个有限状态机。
奖励积分:创建一个有限状态机来实现算法但不知道转换的概率。现在使用机器学习来训练它,通过为转换分配概率来获得平坦分布。
我本来会发表评论的,因为它不是直接的答案,但它变得太长了;