从阻塞 IEnumerable 读取

Read from blocking IEnumerable

我从库函数中收到阻塞的 IEnumerable(就像永无止境的流)。有时它包含一些消息,有时包含很多消息。我需要从中获取所有现有消息,我不想等待新消息。

此代码获取现有消息并等待 10 秒超时。

var cancellationTokenSource = new CancellationTokenSource(10000);
foreach (var data in consumer.Consume(cancellationTokenSource.Token))
{
    Console.WriteLine(data.message);
}

我应该如何修改我的代码?

public IEnumerable<Message> Consume(CancellationToken? cancellationToken = null)

我会进一步抽象出问题。我个人不会在循环数据时执行消费。我将实现一个线程计时器,该计时器排队到堆栈或队列中。然后让你的循环 Pop 项目进行处理。您甚至可能会发现您想要使用 ConcurrentQueue 来应对波动性。

我认为 BlockingCollection.TryTake 会满足您的需求

If the collection is empty, this method immediately returns false.

while (bc.TryTake(out T))
{
}

解决方法

所以,我们有无限的 IEnumerable。如果其中没有消息,"foreach" 将等待它们的出现。幸运的是,我们的 "Consume" 方法接受取消标记,我们可以在循环内更改它。在处理完来自 IEnumerable 的第一条消息后,我设置了 "CancelAfter(1000)" - 类似于超时。如果新消息在一秒钟内出现,计时器将重置。当队列为空时,计时器触发,我们打破循环(尝试...捕获需要的 OperationCanceledException)

var cancellationTokenSource = new CancellationTokenSource(1000);
foreach (var data in consumer.Consume(cancellationTokenSource.Token))
{
    Console.WriteLine(data.message);
    cancellationTokenSource.CancelAfter(1000);
}