如果 X 分钟内没有新项目进入“Channel”,如何读取“Channel”中小于批量大小的剩余项目
how to read remaining items in `Channel` less than batch size if there is no new items coming to `Channel` within X minutes
我正在使用 System.Threading.Channels
中的 Channel
并希望批量读取项目(5 个项目),我有如下方法,
public class Batcher
{
private readonly Channel<MeasurementViewModel> _channel;
public Batcher()
{
_channel = Channel.CreateUnbounded<MeasurementViewModel>();
}
public async Task<MeasurementViewModel[]> ReadBatchAsync(int batchSize, CancellationToken stoppingToken)
{
var result = new MeasurementViewModel[batchSize];
for (var i = 0; i < batchSize; i++)
{
result[i] = await _channel.Reader.ReadAsync(stoppingToken);
}
return result;
}
}
在 asp.net 核心后台服务中,我正在使用它,如下所示,
public class WriterService : BackgroundService
{
private readonly Batcher _batcher;
public WriterService(Batcher batcher)
{
_batcher = batcher;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var batchOfItems = await _batcher.ReadBatchAsync(5, stoppingToken);
var range = string.Join(',', batchOfItems.Select(item => item.Value));
var x = range;
}
}
}
这是有效的,只要 Channel
中有 5 个项目,我就会得到 range
。
问题是,当 Channel
中只剩下 2 个项目并且自从最后 10 分钟没有项目到达 Channel
时,那么如何读取 Channel
中剩余的 2 个项目?
您可以创建 linked CancellationTokenSource
, so that you can watch simultaneously for both an external cancellation request, and an internally induced timeout. Below is an example of using this technique, by creating a ReadBatchAsync
extension method for the ChannelReader
class:
public static async ValueTask<T[]> ReadBatchAsync<T>(
this ChannelReader<T> channelReader,
int batchSize, TimeSpan timeout, CancellationToken cancellationToken = default)
{
// Arguments validation omitted
var items = new List<T>(batchSize);
using (var linkedCTS
= CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
{
linkedCTS.CancelAfter(timeout);
while (true)
{
var token = items.Count == 0 ? cancellationToken : linkedCTS.Token;
T item;
try
{
item = await channelReader.ReadAsync(token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
cancellationToken.ThrowIfCancellationRequested();
break; // The cancellation was induced by timeout (ignore it)
}
catch (ChannelClosedException)
{
if (items.Count == 0) throw;
break;
}
items.Add(item);
if (items.Count >= batchSize) break;
}
}
return items.ToArray();
}
此方法将在指定的 timeout
过去后立即生成一个批次,或者如果达到 batchSize
则更快地生成一个批次,前提是该批次至少包含一个项目。否则,它会在收到第一个项目后立即生成 single-item 批次。
如果通道已通过调用 channel.Writer.Complete()
方法完成,并且不再包含任何项目,则 ReadBatchAsync
方法传播与本机抛出的相同 ChannelClosedException
ReadAsync
方法。
如果外部 CancellationToken
被取消,则通过抛出 OperationCanceledException
传播取消。此时可能已经从 ChannelReader<T>
内部提取的任何项目都将丢失。这使得取消功能成为一种破坏性操作。建议在之后丢弃整个 Channel<T>
。
用法示例:
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (true)
{
MeasurementViewModel[] batch;
try
{
batch = await _channel.Reader.ReadBatchAsync(
5, TimeSpan.FromMinutes(10), stoppingToken);
}
catch (OperationCanceledException) { return; }
catch (ChannelClosedException) { break; }
Console.WriteLine(String.Join(',', batch.Select(item => item.Value)));
}
await _channel.Reader.Completion; // Propagate possible failure
}
警告: 目前 (.NET 6) 在与此问题和此答案相关的条件下,ChannelReader<T>.ReadAsync
方法容易发生内存泄漏。具体来说,当 CancellationToken
被取消时,关联的 AsyncOperation
仍然附加在通道的内部数据结构中,并且直到在通道中写入项目时才会释放。因此,在 timer-based CancellationToken
循环中反复等待 ChannelReader<T>.ReadAsync
是不可取的。参考文献:
- Channels with CancellationTokenSource with timeout memory leak after dispose
- Possible CancellationTokenRegistration leak in
AsyncOperation<T>
对于替代方法,您可以查看 。
我正在使用 System.Threading.Channels
中的 Channel
并希望批量读取项目(5 个项目),我有如下方法,
public class Batcher
{
private readonly Channel<MeasurementViewModel> _channel;
public Batcher()
{
_channel = Channel.CreateUnbounded<MeasurementViewModel>();
}
public async Task<MeasurementViewModel[]> ReadBatchAsync(int batchSize, CancellationToken stoppingToken)
{
var result = new MeasurementViewModel[batchSize];
for (var i = 0; i < batchSize; i++)
{
result[i] = await _channel.Reader.ReadAsync(stoppingToken);
}
return result;
}
}
在 asp.net 核心后台服务中,我正在使用它,如下所示,
public class WriterService : BackgroundService
{
private readonly Batcher _batcher;
public WriterService(Batcher batcher)
{
_batcher = batcher;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var batchOfItems = await _batcher.ReadBatchAsync(5, stoppingToken);
var range = string.Join(',', batchOfItems.Select(item => item.Value));
var x = range;
}
}
}
这是有效的,只要 Channel
中有 5 个项目,我就会得到 range
。
问题是,当 Channel
中只剩下 2 个项目并且自从最后 10 分钟没有项目到达 Channel
时,那么如何读取 Channel
中剩余的 2 个项目?
您可以创建 linked CancellationTokenSource
, so that you can watch simultaneously for both an external cancellation request, and an internally induced timeout. Below is an example of using this technique, by creating a ReadBatchAsync
extension method for the ChannelReader
class:
public static async ValueTask<T[]> ReadBatchAsync<T>(
this ChannelReader<T> channelReader,
int batchSize, TimeSpan timeout, CancellationToken cancellationToken = default)
{
// Arguments validation omitted
var items = new List<T>(batchSize);
using (var linkedCTS
= CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
{
linkedCTS.CancelAfter(timeout);
while (true)
{
var token = items.Count == 0 ? cancellationToken : linkedCTS.Token;
T item;
try
{
item = await channelReader.ReadAsync(token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
cancellationToken.ThrowIfCancellationRequested();
break; // The cancellation was induced by timeout (ignore it)
}
catch (ChannelClosedException)
{
if (items.Count == 0) throw;
break;
}
items.Add(item);
if (items.Count >= batchSize) break;
}
}
return items.ToArray();
}
此方法将在指定的 timeout
过去后立即生成一个批次,或者如果达到 batchSize
则更快地生成一个批次,前提是该批次至少包含一个项目。否则,它会在收到第一个项目后立即生成 single-item 批次。
如果通道已通过调用 channel.Writer.Complete()
方法完成,并且不再包含任何项目,则 ReadBatchAsync
方法传播与本机抛出的相同 ChannelClosedException
ReadAsync
方法。
如果外部 CancellationToken
被取消,则通过抛出 OperationCanceledException
传播取消。此时可能已经从 ChannelReader<T>
内部提取的任何项目都将丢失。这使得取消功能成为一种破坏性操作。建议在之后丢弃整个 Channel<T>
。
用法示例:
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (true)
{
MeasurementViewModel[] batch;
try
{
batch = await _channel.Reader.ReadBatchAsync(
5, TimeSpan.FromMinutes(10), stoppingToken);
}
catch (OperationCanceledException) { return; }
catch (ChannelClosedException) { break; }
Console.WriteLine(String.Join(',', batch.Select(item => item.Value)));
}
await _channel.Reader.Completion; // Propagate possible failure
}
警告: 目前 (.NET 6) 在与此问题和此答案相关的条件下,ChannelReader<T>.ReadAsync
方法容易发生内存泄漏。具体来说,当 CancellationToken
被取消时,关联的 AsyncOperation
仍然附加在通道的内部数据结构中,并且直到在通道中写入项目时才会释放。因此,在 timer-based CancellationToken
循环中反复等待 ChannelReader<T>.ReadAsync
是不可取的。参考文献:
- Channels with CancellationTokenSource with timeout memory leak after dispose
- Possible CancellationTokenRegistration leak in
AsyncOperation<T>
对于替代方法,您可以查看