如何在没有左偏的情况下合并多个异步序列?
How to merge multiple asynchronous sequences without left-side bias?
我有几个 AsyncEnumerable<string>
想合并成一个 AsyncEnumerable<string>
,它应该包含从这些序列同时发出的所有元素。所以我使用了 Merge
operator from the System.Interactive.Async 包。问题是该运算符并不总是将所有序列视为相等的。在某些情况下,它更喜欢从参数列表左侧的序列中发射元素,而忽略参数列表右侧的序列。这是重现这种不良行为的最小示例:
var sequence_A = Enumerable.Range(1, 5).Select(i => $"A{i}").ToAsyncEnumerable();
var sequence_B = Enumerable.Range(1, 5).Select(i => $"B{i}").ToAsyncEnumerable();
var sequence_C = Enumerable.Range(1, 5).Select(i => $"C{i}").ToAsyncEnumerable();
var merged = AsyncEnumerableEx.Merge(sequence_A, sequence_B, sequence_C);
await foreach (var item in merged) Console.WriteLine(item);
此代码片段还依赖于 System.Linq.Async 包。 sequence_A
从"A"
开始发射5个元素,sequence_B
从"B"
开始发射5个元素,sequence_C
从[=23=开始发射5个元素].
输出(不理想):
A1
A2
A3
A4
A5
B1
B2
B3
B4
B5
C1
C2
C3
C4
C5
理想的输出应如下所示:
A1
B1
C1
A2
B2
C2
A3
B3
C3
A4
B4
C4
A5
B5
C5
如果所有序列都有其下一个元素可用,则合并后的序列应从每个序列中提取一个元素,而不是从最左侧的序列中重复提取元素。
如何确保我的序列公平合并?我正在寻找具有理想行为的官方包中的运算符组合,或者寻找可以执行我想要的操作的自定义 Merge
运算符。
澄清: 我对 concurrent Merge
功能很感兴趣,其中同时观察所有源序列,并且来自任何序列的任何发射都传播到合并的序列。当多个序列可以立即发射一个元素时,公平的概念适用,在这种情况下,它们的发射应该交错。在相反的情况下,当没有立即可用的元素时,规则是“先到先走”。
更新:这是一个更真实的演示,其中包括生产者序列和消费枚举循环中的延迟。它模拟了一种情况,即使用最左侧序列产生的值所花费的时间比产生这些值所需的时间更长。
var sequence_A = Produce("A", 200, 1, 2, 3, 4, 5);
var sequence_B = Produce("B", 150, 1, 2, 3, 4, 5);
var sequence_C = Produce("C", 100, 1, 2, 3, 4, 5);
var merged = AsyncEnumerableEx.Merge(sequence_A, sequence_B, sequence_C);
await foreach (var item in merged)
{
Console.WriteLine(item);
await Task.Delay(item.StartsWith("A") ? 300 : 50); // Latency
}
async IAsyncEnumerable<string> Produce(string prefix, int delay, params int[] values)
{
foreach (var value in values)
{
var delayTask = Task.Delay(delay);
yield return $"{prefix}{value}";
await delayTask; // Latency
}
}
结果是 sequence_A
:
生成的值出现了不良偏差
A1
A2
A3
A4
A5
B1
B2
C1
B3
C2
B4
C3
C4
B5
C5
这是最终代码。该算法已被修改以适应 OP。我把原来的代码留在下面。
这里使用了贪心算法:返回第一个可用的值,不尝试依次合并。每次任务完成时,同一枚举器的下一个枚举器会返回到后面,以确保公平。
算法如下:
- 该函数接受
params
个源数组。
- 如果未提供源枚举,则提前退出。
- 创建一个列表以将枚举器及其各自的任务作为元组保存。
- 获取每个枚举器,调用
MoveNextAsync
并将该对存储在列表中。
- 在循环中,对整个列表调用
Task.WhenAny
。
- 获取结果
Task
并在列表中找到它的位置。
- 将元组保存在变量中并将其从列表中删除。
- 如果它返回
true
,则 yield
值并为匹配的枚举器再次调用 MoveNextAsync
,将生成的元组推到列表的后面。
- 如果它 returns
false
,则 Dispose
枚举数。
- 继续循环直到列表为空。
finally
块处理任何剩余的枚举器。
- 还有一个重载来提供取消令牌
在分配等方面有一些效率。我把它留给了 reader。
public static IAsyncEnumerable<T> Interleave<T>(params IAsyncEnumerable<T>[] sources) =>
Interleave(default, sources);
public static async IAsyncEnumerable<T> Interleave<T>([EnumeratorCancellation] CancellationToken token, IAsyncEnumerable<T>[] sources)
{
if(sources.Length == 0)
yield break;
var enumerators = new List<(IAsyncEnumerator<T> e, Task<bool> t)>(sources.Length);
try
{
for(var i = 0; i < sources.Length; i++)
{
var e = sources[i].GetAsyncEnumerator(token);
enumerators.Add((e, e.MoveNextAsync().AsTask()));
}
do
{
var taskResult = await Task.WhenAny(enumerators.Select(tuple => tuple.t));
var ind = enumerators.FindIndex(tuple => tuple.t == taskResult);
var tuple = enumerators[ind];
enumerators.RemoveAt(ind);
if(taskResult.Result)
{
yield return tuple.e.Current;
enumerators.Add((tuple.e, tuple.e.MoveNextAsync().AsTask()));
}
else
{
try
{
await tuple.e.DisposeAsync();
}
catch
{ //
}
}
} while (enumerators.Count > 0);
}
finally
{
for(var i = 0; i < enumerators.Count; i++)
{
try
{
await enumerators[i].e.DisposeAsync();
}
catch
{ //
}
}
}
}
EDIT 下面不是 OP 想要的,因为 OP 希望返回任何结果,以先到者为准。我将把它留在这里,因为它很好地演示了这个算法。
这是异步 Interleave
或 Merge
算法的完整实现 ,在 SQL 术语中更常见的是 合并串联.
算法如下:
- 该函数接受
params
个来源数组。
- 如果未提供源枚举,则提前退出。
- 创建一个列表来保存枚举数。
- 获取每个枚举器并将其存储在列表中。
- 在循环中,获取每个枚举器和
MoveNextAsync
。
- 如果它 returns
true
,则 yield
值并递增循环计数器。如果翻车了,就回到起点。
- 如果 returns
false
,则 Dispose
并从列表中删除。 不增加计数器。
- 继续循环,直到没有更多的枚举器。
finally
块处理任何剩余的枚举器。
- 还有一个重载来提供取消令牌
public static IAsyncEnumerable<T> Interleave<T>(params IAsyncEnumerable<T>[] sources) =>
Interleave(default, sources);
public static async IAsyncEnumerable<T> Interleave<T>([EnumeratorCancellation] CancellationToken token, IAsyncEnumerable<T>[] sources)
{
if(sources.Length == 0)
yield break;
var enumerators = new List<IAsyncEnumerator<T>>(sources.Length);
try
{
for(var i = 0; i < sources.Length; i++)
enumerators.Add(sources[i].GetAsyncEnumerator(token));
var j = 0;
do
{
if(await enumerators[j].MoveNextAsync())
{
yield return enumerators[j].Current;
j++;
if(j >= enumerators.Count)
j = 0;
}
else
{
try
{
await enumerators[j].DisposeAsync();
}
catch
{ //
}
enumerators.RemoveAt(j);
}
} while (enumerators.Count > 0);
}
finally
{
for(var i = 0; i < enumerators.Count; i++)
{
try
{
await enumerators[i].DisposeAsync();
}
catch
{ //
}
}
}
}
如果您只有固定数量的源枚举,这显然可以大大简化。
这个例子有点做作,因为所有结果都是立即可用的。即使添加了一个小的延迟,结果也是混合的:
var sequence_A = AsyncEnumerable.Range(1, 5)
.SelectAwait(async i =>{ await Task.Delay(i); return $"A{i}";});
var sequence_B = AsyncEnumerable.Range(1, 5)
.SelectAwait(async i =>{ await Task.Delay(i); return $"B{i}";});
var sequence_C = AsyncEnumerable.Range(1, 5)
.SelectAwait(async i =>{ await Task.Delay(i); return $"C{i}";});
var sequence_D = AsyncEnumerable.Range(1, 5)
.SelectAwait(async i =>{ await Task.Delay(i); return $"D{i}";});
await foreach (var item in seq) Console.WriteLine(item);
每次都会产生不同的混合结果:
B1
A1
C1
D1
D2
A2
B2
C2
D3
A3
B3
C3
C4
A4
B4
D4
D5
A5
B5
C5
method's comments 解释它被重新实现是为了更便宜和更公平:
//
// This new implementation of Merge differs from the original one in a few ways:
//
// - It's cheaper because:
// - no conversion from ValueTask<bool> to Task<bool> takes place using AsTask,
// - we don't instantiate Task.WhenAny tasks for each iteration.
// - It's fairer because:
// - the MoveNextAsync tasks are awaited concurently, but completions are queued,
// instead of awaiting a new WhenAny task where "left" sources have preferential
// treatment over "right" sources.
//
我有几个 AsyncEnumerable<string>
想合并成一个 AsyncEnumerable<string>
,它应该包含从这些序列同时发出的所有元素。所以我使用了 Merge
operator from the System.Interactive.Async 包。问题是该运算符并不总是将所有序列视为相等的。在某些情况下,它更喜欢从参数列表左侧的序列中发射元素,而忽略参数列表右侧的序列。这是重现这种不良行为的最小示例:
var sequence_A = Enumerable.Range(1, 5).Select(i => $"A{i}").ToAsyncEnumerable();
var sequence_B = Enumerable.Range(1, 5).Select(i => $"B{i}").ToAsyncEnumerable();
var sequence_C = Enumerable.Range(1, 5).Select(i => $"C{i}").ToAsyncEnumerable();
var merged = AsyncEnumerableEx.Merge(sequence_A, sequence_B, sequence_C);
await foreach (var item in merged) Console.WriteLine(item);
此代码片段还依赖于 System.Linq.Async 包。 sequence_A
从"A"
开始发射5个元素,sequence_B
从"B"
开始发射5个元素,sequence_C
从[=23=开始发射5个元素].
输出(不理想):
A1
A2
A3
A4
A5
B1
B2
B3
B4
B5
C1
C2
C3
C4
C5
理想的输出应如下所示:
A1
B1
C1
A2
B2
C2
A3
B3
C3
A4
B4
C4
A5
B5
C5
如果所有序列都有其下一个元素可用,则合并后的序列应从每个序列中提取一个元素,而不是从最左侧的序列中重复提取元素。
如何确保我的序列公平合并?我正在寻找具有理想行为的官方包中的运算符组合,或者寻找可以执行我想要的操作的自定义 Merge
运算符。
澄清: 我对 concurrent Merge
功能很感兴趣,其中同时观察所有源序列,并且来自任何序列的任何发射都传播到合并的序列。当多个序列可以立即发射一个元素时,公平的概念适用,在这种情况下,它们的发射应该交错。在相反的情况下,当没有立即可用的元素时,规则是“先到先走”。
更新:这是一个更真实的演示,其中包括生产者序列和消费枚举循环中的延迟。它模拟了一种情况,即使用最左侧序列产生的值所花费的时间比产生这些值所需的时间更长。
var sequence_A = Produce("A", 200, 1, 2, 3, 4, 5);
var sequence_B = Produce("B", 150, 1, 2, 3, 4, 5);
var sequence_C = Produce("C", 100, 1, 2, 3, 4, 5);
var merged = AsyncEnumerableEx.Merge(sequence_A, sequence_B, sequence_C);
await foreach (var item in merged)
{
Console.WriteLine(item);
await Task.Delay(item.StartsWith("A") ? 300 : 50); // Latency
}
async IAsyncEnumerable<string> Produce(string prefix, int delay, params int[] values)
{
foreach (var value in values)
{
var delayTask = Task.Delay(delay);
yield return $"{prefix}{value}";
await delayTask; // Latency
}
}
结果是 sequence_A
:
A1
A2
A3
A4
A5
B1
B2
C1
B3
C2
B4
C3
C4
B5
C5
这是最终代码。该算法已被修改以适应 OP。我把原来的代码留在下面。
这里使用了贪心算法:返回第一个可用的值,不尝试依次合并。每次任务完成时,同一枚举器的下一个枚举器会返回到后面,以确保公平。
算法如下:
- 该函数接受
params
个源数组。 - 如果未提供源枚举,则提前退出。
- 创建一个列表以将枚举器及其各自的任务作为元组保存。
- 获取每个枚举器,调用
MoveNextAsync
并将该对存储在列表中。 - 在循环中,对整个列表调用
Task.WhenAny
。 - 获取结果
Task
并在列表中找到它的位置。 - 将元组保存在变量中并将其从列表中删除。
- 如果它返回
true
,则yield
值并为匹配的枚举器再次调用MoveNextAsync
,将生成的元组推到列表的后面。 - 如果它 returns
false
,则Dispose
枚举数。 - 继续循环直到列表为空。
finally
块处理任何剩余的枚举器。- 还有一个重载来提供取消令牌
在分配等方面有一些效率。我把它留给了 reader。
public static IAsyncEnumerable<T> Interleave<T>(params IAsyncEnumerable<T>[] sources) =>
Interleave(default, sources);
public static async IAsyncEnumerable<T> Interleave<T>([EnumeratorCancellation] CancellationToken token, IAsyncEnumerable<T>[] sources)
{
if(sources.Length == 0)
yield break;
var enumerators = new List<(IAsyncEnumerator<T> e, Task<bool> t)>(sources.Length);
try
{
for(var i = 0; i < sources.Length; i++)
{
var e = sources[i].GetAsyncEnumerator(token);
enumerators.Add((e, e.MoveNextAsync().AsTask()));
}
do
{
var taskResult = await Task.WhenAny(enumerators.Select(tuple => tuple.t));
var ind = enumerators.FindIndex(tuple => tuple.t == taskResult);
var tuple = enumerators[ind];
enumerators.RemoveAt(ind);
if(taskResult.Result)
{
yield return tuple.e.Current;
enumerators.Add((tuple.e, tuple.e.MoveNextAsync().AsTask()));
}
else
{
try
{
await tuple.e.DisposeAsync();
}
catch
{ //
}
}
} while (enumerators.Count > 0);
}
finally
{
for(var i = 0; i < enumerators.Count; i++)
{
try
{
await enumerators[i].e.DisposeAsync();
}
catch
{ //
}
}
}
}
EDIT 下面不是 OP 想要的,因为 OP 希望返回任何结果,以先到者为准。我将把它留在这里,因为它很好地演示了这个算法。
这是异步 Interleave
或 Merge
算法的完整实现 ,在 SQL 术语中更常见的是 合并串联.
算法如下:
- 该函数接受
params
个来源数组。 - 如果未提供源枚举,则提前退出。
- 创建一个列表来保存枚举数。
- 获取每个枚举器并将其存储在列表中。
- 在循环中,获取每个枚举器和
MoveNextAsync
。 - 如果它 returns
true
,则yield
值并递增循环计数器。如果翻车了,就回到起点。 - 如果 returns
false
,则Dispose
并从列表中删除。 不增加计数器。 - 继续循环,直到没有更多的枚举器。
finally
块处理任何剩余的枚举器。- 还有一个重载来提供取消令牌
public static IAsyncEnumerable<T> Interleave<T>(params IAsyncEnumerable<T>[] sources) =>
Interleave(default, sources);
public static async IAsyncEnumerable<T> Interleave<T>([EnumeratorCancellation] CancellationToken token, IAsyncEnumerable<T>[] sources)
{
if(sources.Length == 0)
yield break;
var enumerators = new List<IAsyncEnumerator<T>>(sources.Length);
try
{
for(var i = 0; i < sources.Length; i++)
enumerators.Add(sources[i].GetAsyncEnumerator(token));
var j = 0;
do
{
if(await enumerators[j].MoveNextAsync())
{
yield return enumerators[j].Current;
j++;
if(j >= enumerators.Count)
j = 0;
}
else
{
try
{
await enumerators[j].DisposeAsync();
}
catch
{ //
}
enumerators.RemoveAt(j);
}
} while (enumerators.Count > 0);
}
finally
{
for(var i = 0; i < enumerators.Count; i++)
{
try
{
await enumerators[i].DisposeAsync();
}
catch
{ //
}
}
}
}
如果您只有固定数量的源枚举,这显然可以大大简化。
这个例子有点做作,因为所有结果都是立即可用的。即使添加了一个小的延迟,结果也是混合的:
var sequence_A = AsyncEnumerable.Range(1, 5)
.SelectAwait(async i =>{ await Task.Delay(i); return $"A{i}";});
var sequence_B = AsyncEnumerable.Range(1, 5)
.SelectAwait(async i =>{ await Task.Delay(i); return $"B{i}";});
var sequence_C = AsyncEnumerable.Range(1, 5)
.SelectAwait(async i =>{ await Task.Delay(i); return $"C{i}";});
var sequence_D = AsyncEnumerable.Range(1, 5)
.SelectAwait(async i =>{ await Task.Delay(i); return $"D{i}";});
await foreach (var item in seq) Console.WriteLine(item);
每次都会产生不同的混合结果:
B1
A1
C1
D1
D2
A2
B2
C2
D3
A3
B3
C3
C4
A4
B4
D4
D5
A5
B5
C5
method's comments 解释它被重新实现是为了更便宜和更公平:
//
// This new implementation of Merge differs from the original one in a few ways:
//
// - It's cheaper because:
// - no conversion from ValueTask<bool> to Task<bool> takes place using AsTask,
// - we don't instantiate Task.WhenAny tasks for each iteration.
// - It's fairer because:
// - the MoveNextAsync tasks are awaited concurently, but completions are queued,
// instead of awaiting a new WhenAny task where "left" sources have preferential
// treatment over "right" sources.
//