从阻塞 IEnumerable 读取
Read from blocking IEnumerable
我从库函数中收到阻塞的 IEnumerable(就像永无止境的流)。有时它包含一些消息,有时包含很多消息。我需要从中获取所有现有消息,我不想等待新消息。
此代码获取现有消息并等待 10 秒超时。
var cancellationTokenSource = new CancellationTokenSource(10000);
foreach (var data in consumer.Consume(cancellationTokenSource.Token))
{
Console.WriteLine(data.message);
}
我应该如何修改我的代码?
public IEnumerable<Message> Consume(CancellationToken? cancellationToken = null)
我会进一步抽象出问题。我个人不会在循环数据时执行消费。我将实现一个线程计时器,该计时器排队到堆栈或队列中。然后让你的循环 Pop 项目进行处理。您甚至可能会发现您想要使用 ConcurrentQueue 来应对波动性。
我认为 BlockingCollection.TryTake 会满足您的需求
If the collection is empty, this method immediately returns false.
while (bc.TryTake(out T))
{
}
解决方法
所以,我们有无限的 IEnumerable。如果其中没有消息,"foreach" 将等待它们的出现。幸运的是,我们的 "Consume" 方法接受取消标记,我们可以在循环内更改它。在处理完来自 IEnumerable 的第一条消息后,我设置了 "CancelAfter(1000)" - 类似于超时。如果新消息在一秒钟内出现,计时器将重置。当队列为空时,计时器触发,我们打破循环(尝试...捕获需要的 OperationCanceledException)
var cancellationTokenSource = new CancellationTokenSource(1000);
foreach (var data in consumer.Consume(cancellationTokenSource.Token))
{
Console.WriteLine(data.message);
cancellationTokenSource.CancelAfter(1000);
}
我从库函数中收到阻塞的 IEnumerable(就像永无止境的流)。有时它包含一些消息,有时包含很多消息。我需要从中获取所有现有消息,我不想等待新消息。
此代码获取现有消息并等待 10 秒超时。
var cancellationTokenSource = new CancellationTokenSource(10000);
foreach (var data in consumer.Consume(cancellationTokenSource.Token))
{
Console.WriteLine(data.message);
}
我应该如何修改我的代码?
public IEnumerable<Message> Consume(CancellationToken? cancellationToken = null)
我会进一步抽象出问题。我个人不会在循环数据时执行消费。我将实现一个线程计时器,该计时器排队到堆栈或队列中。然后让你的循环 Pop 项目进行处理。您甚至可能会发现您想要使用 ConcurrentQueue 来应对波动性。
我认为 BlockingCollection.TryTake 会满足您的需求
If the collection is empty, this method immediately returns false.
while (bc.TryTake(out T))
{
}
解决方法
所以,我们有无限的 IEnumerable。如果其中没有消息,"foreach" 将等待它们的出现。幸运的是,我们的 "Consume" 方法接受取消标记,我们可以在循环内更改它。在处理完来自 IEnumerable 的第一条消息后,我设置了 "CancelAfter(1000)" - 类似于超时。如果新消息在一秒钟内出现,计时器将重置。当队列为空时,计时器触发,我们打破循环(尝试...捕获需要的 OperationCanceledException)
var cancellationTokenSource = new CancellationTokenSource(1000);
foreach (var data in consumer.Consume(cancellationTokenSource.Token))
{
Console.WriteLine(data.message);
cancellationTokenSource.CancelAfter(1000);
}