如何对 IAsyncEnumerable<T> 进行批处理,在连续批次之间强制执行最大间隔策略?
How to batch an IAsyncEnumerable<T>, enforcing a maximum interval policy between consecutive batches?
我有一个异步消息序列(流),有时大量到达,有时零星到达,我想以每批 10 条消息为一组来处理它们。我还想对接收消息和处理消息之间的延迟施加上限,因此如果在收到该批消息的第一条消息后 5 秒过去了,则还应该处理少于 10 条消息的批次。我发现我可以使用 Buffer
operator from the System.Interactive.Async 包解决问题的第一部分:
IAsyncEnumerable<Message> source = GetStreamOfMessages();
IAsyncEnumerable<IList<Message>> batches = source.Buffer(10);
await foreach (IList<Message> batch in batches)
{
// Process batch
}
Buffer
操作员的签名:
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source, int count);
不幸的是,Buffer
运算符没有带有 TimeSpan
参数的重载,所以我无法轻易解决问题的第二部分。我将不得不自己以某种方式实现一个带有计时器的批处理操作符。我的问题是:如何实现具有以下签名的 Buffer
运算符的变体?
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count);
timeSpan
参数应该像这样影响 Buffer
运算符的行为:
- 在发出前一批之后
timeSpan
过去时(或最初在调用 Buffer
方法之后),必须发出一批。
- 如果在发出上一个批次后
timeSpan
已经过去,并且在此期间没有收到任何消息,则必须发出一个空批次。
- 比每个
timeSpan
更频繁地发出批次意味着批次已满。在 timeSpan
过去之前发出少于 count
消息的批处理是不可取的。
我可以根据需要向我的项目添加外部依赖项,例如 System.Interactive.Async or the System.Linq.Async 包。
P.S。这个问题的灵感来自 a recent question 与通道和内存泄漏相关的问题。
这里有两种方法可以解决这个问题。第一个有缺陷,但由于其极其简单,我还是将其发布。一个 Buffer
operator with a TimeSpan
parameter already exists in the System.Reactive package, and converters between asynchronous and observable sequences exist in the System.Linq.Async 包。所以这只是将三个已经可用的运算符链接在一起的问题:
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count)
{
return source.ToObservable().Buffer(timeSpan, count).ToAsyncEnumerable();
}
不幸的是,这种巧妙的方法是有缺陷的,因为从拉动模型转变为推动模型并返回拉动模型会产生副作用。发生的情况是中间可观察序列在订阅时开始积极地拉取源 IAsyncEnumerable
,而不管结果 IAsyncEnumerable
是如何拉取的。因此,不是结果序列的消费者成为枚举的驱动程序,而是枚举以源序列允许的最大速度在后台静默发生,并且生成的消息缓冲在内部队列中。因此,不仅有可能对消息的处理施加隐藏的延迟,而且内存消耗也有可能失控地飙升。
第二种是实践方法,它使用Task.Delay
method as a timer, and the Task.WhenAny
方法来协调计时器和枚举任务。这种方法的行为类似于基于 Rx 的方法,除了源序列的枚举是由结果序列的消费者驱动的,正如人们所期望的那样。
/// <summary>
/// Splits the elements of a sequence into chunks that are sent out when either
/// they're full, or a given amount of time has elapsed.
/// </summary>
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (timeSpan < TimeSpan.Zero) throw new ArgumentNullException(nameof(timeSpan));
if (count < 1) throw new ArgumentOutOfRangeException(nameof(count));
return Implementation();
async IAsyncEnumerable<IList<TSource>> Implementation(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var timerCts = new CancellationTokenSource();
var delayTask = Task.Delay(timeSpan, timerCts.Token);
(ValueTask<bool> ValueTask, Task<bool> Task) moveNext = default;
using var linkedCts = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
var enumerator = source.GetAsyncEnumerator(linkedCts.Token);
try
{
moveNext = (enumerator.MoveNextAsync(), null);
var buffer = new List<TSource>(count);
ExceptionDispatchInfo error = null;
while (true)
{
Task completedTask = null;
if (!moveNext.ValueTask.IsCompleted)
{
// Preserve the ValueTask, if it's not preserved already.
if (moveNext.Task == null)
{
var preserved = moveNext.ValueTask.AsTask();
moveNext = (new ValueTask<bool>(preserved), preserved);
}
completedTask = await Task.WhenAny(moveNext.Task, delayTask)
.ConfigureAwait(false);
}
if (completedTask == delayTask)
{
Debug.Assert(delayTask.IsCompleted);
yield return buffer.ToArray(); // It's OK if the buffer is empty.
buffer.Clear();
delayTask = Task.Delay(timeSpan, timerCts.Token);
}
else
{
Debug.Assert(moveNext.ValueTask.IsCompleted);
// Await a copy, to prevent a second await on finally.
var moveNextCopy = moveNext.ValueTask;
moveNext = default;
bool moved;
try { moved = await moveNextCopy.ConfigureAwait(false); }
catch (Exception ex)
{
error = ExceptionDispatchInfo.Capture(ex); break;
}
if (!moved) break;
buffer.Add(enumerator.Current);
if (buffer.Count == count)
{
timerCts.Cancel(); timerCts.Dispose();
timerCts = new CancellationTokenSource();
yield return buffer.ToArray();
buffer.Clear();
delayTask = Task.Delay(timeSpan, timerCts.Token);
}
try { moveNext = (enumerator.MoveNextAsync(), null); }
catch (Exception ex)
{
error = ExceptionDispatchInfo.Capture(ex); break;
}
}
}
if (buffer.Count > 0) yield return buffer.ToArray();
error?.Throw();
}
finally
{
// The finally runs when an enumerator created by this method is disposed.
timerCts.Cancel(); timerCts.Dispose();
// Prevent fire-and-forget, otherwise the DisposeAsync() might throw.
// Cancel the async-enumerator, for more responsive completion.
// Swallow MoveNextAsync errors, but propagate DisposeAsync errors.
linkedCts.Cancel();
try { await moveNext.ValueTask.ConfigureAwait(false); } catch { }
await enumerator.DisposeAsync().ConfigureAwait(false);
}
}
}
已采取措施避免泄漏即发即弃 MoveNextAsync
操作或计时器。
仅当 MoveNextAsync
调用 returns 未完成的 ValueTask<bool>
时才会分配 Task
包装器。
此实现是非破坏性的,这意味着从源序列中消耗的任何元素都不会丢失。如果源序列失败或枚举被取消,任何缓冲的元素都将在错误传播之前发出。
如何使用 Channel
来实现所需的功能?如果使用类似这种扩展方法从队列中读取直到超时到期,是否存在任何缺陷?
public static async Task<List<T>> ReadWithTimeoutAsync<T>(this ChannelReader<T> reader, TimeSpan readTOut, CancellationToken cancellationToken)
{
var timeoutTokenSrc = new CancellationTokenSource();
timeoutTokenSrc.CancelAfter(readTOut);
var messages = new List<T>();
using (CancellationTokenSource linkedCts =
CancellationTokenSource.CreateLinkedTokenSource(timeoutTokenSrc.Token, cancellationToken))
{
try
{
await foreach (var item in reader.ReadAllAsync(linkedCts.Token))
{
messages.Add(item);
linkedCts.Token.ThrowIfCancellationRequested();
}
Console.WriteLine("All messages read.");
}
catch (OperationCanceledException)
{
if (timeoutTokenSrc.Token.IsCancellationRequested)
{
Console.WriteLine($"Delay ({readTOut.Milliseconds} msec) for reading items from message channel has expired.");
}
else if (cancellationToken.IsCancellationRequested)
{
Console.WriteLine("Cancelling per user request.");
cancellationToken.ThrowIfCancellationRequested();
}
}
}
timeoutTokenSrc.Dispose();
return messages;
}
将超时与最大值相结合。批量大小,可以添加一个令牌源:
public static async Task<List<T>> ReadBatchWithTimeoutAsync<T>(this ChannelReader<T> reader, int maxBatchSize, TimeSpan readTOut, CancellationToken cancellationToken)
{
var timeoutTokenSrc = new CancellationTokenSource();
timeoutTokenSrc.CancelAfter(readTOut);
var maxSizeTokenSrc = new CancellationTokenSource();
var messages = new List<T>();
using (CancellationTokenSource linkedCts =
CancellationTokenSource.CreateLinkedTokenSource(timeoutTokenSrc.Token, maxSizeTokenSrc.Token, cancellationToken))
{
try
{
await foreach (var item in reader.ReadAllAsync(linkedCts.Token))
{
messages.Add(item);
if (messages.Count >= maxBatchSize)
{
maxSizeTokenSrc.Cancel();
}
linkedCts.Token.ThrowIfCancellationRequested();
}....
我有一个异步消息序列(流),有时大量到达,有时零星到达,我想以每批 10 条消息为一组来处理它们。我还想对接收消息和处理消息之间的延迟施加上限,因此如果在收到该批消息的第一条消息后 5 秒过去了,则还应该处理少于 10 条消息的批次。我发现我可以使用 Buffer
operator from the System.Interactive.Async 包解决问题的第一部分:
IAsyncEnumerable<Message> source = GetStreamOfMessages();
IAsyncEnumerable<IList<Message>> batches = source.Buffer(10);
await foreach (IList<Message> batch in batches)
{
// Process batch
}
Buffer
操作员的签名:
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source, int count);
不幸的是,Buffer
运算符没有带有 TimeSpan
参数的重载,所以我无法轻易解决问题的第二部分。我将不得不自己以某种方式实现一个带有计时器的批处理操作符。我的问题是:如何实现具有以下签名的 Buffer
运算符的变体?
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count);
timeSpan
参数应该像这样影响 Buffer
运算符的行为:
- 在发出前一批之后
timeSpan
过去时(或最初在调用Buffer
方法之后),必须发出一批。 - 如果在发出上一个批次后
timeSpan
已经过去,并且在此期间没有收到任何消息,则必须发出一个空批次。 - 比每个
timeSpan
更频繁地发出批次意味着批次已满。在timeSpan
过去之前发出少于count
消息的批处理是不可取的。
我可以根据需要向我的项目添加外部依赖项,例如 System.Interactive.Async or the System.Linq.Async 包。
P.S。这个问题的灵感来自 a recent question 与通道和内存泄漏相关的问题。
这里有两种方法可以解决这个问题。第一个有缺陷,但由于其极其简单,我还是将其发布。一个 Buffer
operator with a TimeSpan
parameter already exists in the System.Reactive package, and converters between asynchronous and observable sequences exist in the System.Linq.Async 包。所以这只是将三个已经可用的运算符链接在一起的问题:
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count)
{
return source.ToObservable().Buffer(timeSpan, count).ToAsyncEnumerable();
}
不幸的是,这种巧妙的方法是有缺陷的,因为从拉动模型转变为推动模型并返回拉动模型会产生副作用。发生的情况是中间可观察序列在订阅时开始积极地拉取源 IAsyncEnumerable
,而不管结果 IAsyncEnumerable
是如何拉取的。因此,不是结果序列的消费者成为枚举的驱动程序,而是枚举以源序列允许的最大速度在后台静默发生,并且生成的消息缓冲在内部队列中。因此,不仅有可能对消息的处理施加隐藏的延迟,而且内存消耗也有可能失控地飙升。
第二种是实践方法,它使用Task.Delay
method as a timer, and the Task.WhenAny
方法来协调计时器和枚举任务。这种方法的行为类似于基于 Rx 的方法,除了源序列的枚举是由结果序列的消费者驱动的,正如人们所期望的那样。
/// <summary>
/// Splits the elements of a sequence into chunks that are sent out when either
/// they're full, or a given amount of time has elapsed.
/// </summary>
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (timeSpan < TimeSpan.Zero) throw new ArgumentNullException(nameof(timeSpan));
if (count < 1) throw new ArgumentOutOfRangeException(nameof(count));
return Implementation();
async IAsyncEnumerable<IList<TSource>> Implementation(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var timerCts = new CancellationTokenSource();
var delayTask = Task.Delay(timeSpan, timerCts.Token);
(ValueTask<bool> ValueTask, Task<bool> Task) moveNext = default;
using var linkedCts = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
var enumerator = source.GetAsyncEnumerator(linkedCts.Token);
try
{
moveNext = (enumerator.MoveNextAsync(), null);
var buffer = new List<TSource>(count);
ExceptionDispatchInfo error = null;
while (true)
{
Task completedTask = null;
if (!moveNext.ValueTask.IsCompleted)
{
// Preserve the ValueTask, if it's not preserved already.
if (moveNext.Task == null)
{
var preserved = moveNext.ValueTask.AsTask();
moveNext = (new ValueTask<bool>(preserved), preserved);
}
completedTask = await Task.WhenAny(moveNext.Task, delayTask)
.ConfigureAwait(false);
}
if (completedTask == delayTask)
{
Debug.Assert(delayTask.IsCompleted);
yield return buffer.ToArray(); // It's OK if the buffer is empty.
buffer.Clear();
delayTask = Task.Delay(timeSpan, timerCts.Token);
}
else
{
Debug.Assert(moveNext.ValueTask.IsCompleted);
// Await a copy, to prevent a second await on finally.
var moveNextCopy = moveNext.ValueTask;
moveNext = default;
bool moved;
try { moved = await moveNextCopy.ConfigureAwait(false); }
catch (Exception ex)
{
error = ExceptionDispatchInfo.Capture(ex); break;
}
if (!moved) break;
buffer.Add(enumerator.Current);
if (buffer.Count == count)
{
timerCts.Cancel(); timerCts.Dispose();
timerCts = new CancellationTokenSource();
yield return buffer.ToArray();
buffer.Clear();
delayTask = Task.Delay(timeSpan, timerCts.Token);
}
try { moveNext = (enumerator.MoveNextAsync(), null); }
catch (Exception ex)
{
error = ExceptionDispatchInfo.Capture(ex); break;
}
}
}
if (buffer.Count > 0) yield return buffer.ToArray();
error?.Throw();
}
finally
{
// The finally runs when an enumerator created by this method is disposed.
timerCts.Cancel(); timerCts.Dispose();
// Prevent fire-and-forget, otherwise the DisposeAsync() might throw.
// Cancel the async-enumerator, for more responsive completion.
// Swallow MoveNextAsync errors, but propagate DisposeAsync errors.
linkedCts.Cancel();
try { await moveNext.ValueTask.ConfigureAwait(false); } catch { }
await enumerator.DisposeAsync().ConfigureAwait(false);
}
}
}
已采取措施避免泄漏即发即弃 MoveNextAsync
操作或计时器。
仅当 MoveNextAsync
调用 returns 未完成的 ValueTask<bool>
时才会分配 Task
包装器。
此实现是非破坏性的,这意味着从源序列中消耗的任何元素都不会丢失。如果源序列失败或枚举被取消,任何缓冲的元素都将在错误传播之前发出。
如何使用 Channel
来实现所需的功能?如果使用类似这种扩展方法从队列中读取直到超时到期,是否存在任何缺陷?
public static async Task<List<T>> ReadWithTimeoutAsync<T>(this ChannelReader<T> reader, TimeSpan readTOut, CancellationToken cancellationToken)
{
var timeoutTokenSrc = new CancellationTokenSource();
timeoutTokenSrc.CancelAfter(readTOut);
var messages = new List<T>();
using (CancellationTokenSource linkedCts =
CancellationTokenSource.CreateLinkedTokenSource(timeoutTokenSrc.Token, cancellationToken))
{
try
{
await foreach (var item in reader.ReadAllAsync(linkedCts.Token))
{
messages.Add(item);
linkedCts.Token.ThrowIfCancellationRequested();
}
Console.WriteLine("All messages read.");
}
catch (OperationCanceledException)
{
if (timeoutTokenSrc.Token.IsCancellationRequested)
{
Console.WriteLine($"Delay ({readTOut.Milliseconds} msec) for reading items from message channel has expired.");
}
else if (cancellationToken.IsCancellationRequested)
{
Console.WriteLine("Cancelling per user request.");
cancellationToken.ThrowIfCancellationRequested();
}
}
}
timeoutTokenSrc.Dispose();
return messages;
}
将超时与最大值相结合。批量大小,可以添加一个令牌源:
public static async Task<List<T>> ReadBatchWithTimeoutAsync<T>(this ChannelReader<T> reader, int maxBatchSize, TimeSpan readTOut, CancellationToken cancellationToken)
{
var timeoutTokenSrc = new CancellationTokenSource();
timeoutTokenSrc.CancelAfter(readTOut);
var maxSizeTokenSrc = new CancellationTokenSource();
var messages = new List<T>();
using (CancellationTokenSource linkedCts =
CancellationTokenSource.CreateLinkedTokenSource(timeoutTokenSrc.Token, maxSizeTokenSrc.Token, cancellationToken))
{
try
{
await foreach (var item in reader.ReadAllAsync(linkedCts.Token))
{
messages.Add(item);
if (messages.Count >= maxBatchSize)
{
maxSizeTokenSrc.Cancel();
}
linkedCts.Token.ThrowIfCancellationRequested();
}....