BufferBlock.ReceiveAsync(超时) 挂起但 BufferBlock.ReceiveAsync() 工作正常
BufferBlock.ReceiveAsync(timeout) hang but BufferBlock.ReceiveAsync() works OK
要么我做错了什么,但下面从来没有 return 它永远挂在 ReceiveAsync
上,尽管指定了 1 秒的超时。
我希望它在超时后为 return 空值。
/* snipped MyContainer class */
private readonly BufferBlock<byte[]> queue = new BufferBlock<byte[]>();
public async Task ExecuteAsync(CancellationToken stoppingToken)
{
// makes no difference if creating with TaskCreationOptions.LongRunning
await Task
.Factory
.StartNew(async () =>
{
while (stoppingToken.IsCancellationRequested == false)
{
// we get here OK, but no further if i use TimeSpan for delay
// without Timespan i.e. ReceiveAsync() only, it does **not** hang
var item = await
this
.queue
.ReceiveAsync(TimeSpan.FromMilliseconds(1000));
// process it, but we never get here we sleep forever
await ProcessAsync(item);
}
} /*,TaskCreationOptions.LongRunning*/);
// we get here and print the below OK
Console.WriteLine("thread created and running");
}
// this is called by the original (or some other) thread
// either if i call this or not, the above thread code still locks on ReceiveAsync
public void Add(byte[] data)
{
Console.WriteLine("adding");
this.queue.Post(data);
Console.WriteLine("done"); // gets posted OK
}
重要更新 - 如果我不指定延迟就可以正常工作
var item = await this.queue.ReceiveAsync());
如果我消除延迟,代码可以正常工作,但是我每秒都会做一些后台管理(用于数据包计数器等),所以如果在 1 秒内没有收到任何消息,唤醒这一点很重要。
其他说明:
我从一个通用的 .net worker 主机调用上面的代码:
public class Worker : BackgroundService
{
private readonly MyContainer containerClass;
private readonly ILogger<Worker> logger;
public Worker(MyContainer containerClass, ILogger<Worker> logger)
{
this.containerClass = containerClass;
this.logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
this.containerClass.ExecuteAsync(stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
this.logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
await Task.Delay(1000, stoppingToken);
}
}
}
上面是IHostBuilder
建worker后调用的,我调用的是Host.Run()
.
我的理解是(我显然需要继续努力!)因为我创建了线程,它应该 运行 完全独立于(而不是阻塞)created/called 它的线程。 .. 换句话说,它应该能够在线程本身内调用 ReceiveAsync 而不会被阻塞。
感谢所有回复的人,最后感谢 Enrico(随时 copy/paste,我会把答案分配给你)代码实际上 运行 OK。
抛出了一个 TimeoutException 异常,但没有被我的代码或 Visual Studio.
捕获
根据https://docs.microsoft.com/en-us/visualstudio/debugger/managing-exceptions-with-the-debugger?view=vs-2019启用所有 CLR 异常,异常开始被抛出。
然后我在代码中处理了异常,并且能够按照我的设计要求继续进行:
public Task ExecuteAsync(CancellationToken stoppingToken)
{
return Task
.Factory
.StartNew(async () => {
while (stoppingToken.IsCancellationRequested == false)
{
try
{
var ts = TimeSpan.FromSeconds(UpdateFrequencySeconds);
var item = await this.queue.ReceiveAsync(ts);
await ProcessAsync(item);
}
catch (TimeoutException)
{
// this is ok, timer expired
}
catch (Exception e)
{
this.logger.LogError(e.ToString());
}
UpdateCounters();
}
await StopAsync();
},
stoppingToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Default)
.Unwrap();
}
将 Task.Factory.StartNew
与异步委托一起使用会创建嵌套任务:
Task<Task> nestedTask = Task.Factory.StartNew(async () => { //...
您正在等待外部任务,而不是内部任务,因此内部任务变成了即发即弃的任务。可以通过两次使用 await
运算符在一行中等待两个任务:
await await Task.Factory.StartNew(async () => { //...
或者,您可以使用 Unwrap
方法将这两个任务合二为一。
await Task.Factory.StartNew(async () => { /* ... */ }).Unwrap();
...或者更好地使用 Task.Run
方法而不是 Task.Factory.StartNew
,因为前者理解异步委托,并为您解包:
await Task.Run(async () => { //...
如果您对 Task.Factory.StartNew
和 Task.Run
之间的差异感兴趣,您可以阅读一篇内容丰富的文章 here。
要么我做错了什么,但下面从来没有 return 它永远挂在 ReceiveAsync
上,尽管指定了 1 秒的超时。
我希望它在超时后为 return 空值。
/* snipped MyContainer class */
private readonly BufferBlock<byte[]> queue = new BufferBlock<byte[]>();
public async Task ExecuteAsync(CancellationToken stoppingToken)
{
// makes no difference if creating with TaskCreationOptions.LongRunning
await Task
.Factory
.StartNew(async () =>
{
while (stoppingToken.IsCancellationRequested == false)
{
// we get here OK, but no further if i use TimeSpan for delay
// without Timespan i.e. ReceiveAsync() only, it does **not** hang
var item = await
this
.queue
.ReceiveAsync(TimeSpan.FromMilliseconds(1000));
// process it, but we never get here we sleep forever
await ProcessAsync(item);
}
} /*,TaskCreationOptions.LongRunning*/);
// we get here and print the below OK
Console.WriteLine("thread created and running");
}
// this is called by the original (or some other) thread
// either if i call this or not, the above thread code still locks on ReceiveAsync
public void Add(byte[] data)
{
Console.WriteLine("adding");
this.queue.Post(data);
Console.WriteLine("done"); // gets posted OK
}
重要更新 - 如果我不指定延迟就可以正常工作
var item = await this.queue.ReceiveAsync());
如果我消除延迟,代码可以正常工作,但是我每秒都会做一些后台管理(用于数据包计数器等),所以如果在 1 秒内没有收到任何消息,唤醒这一点很重要。
其他说明:
我从一个通用的 .net worker 主机调用上面的代码:
public class Worker : BackgroundService
{
private readonly MyContainer containerClass;
private readonly ILogger<Worker> logger;
public Worker(MyContainer containerClass, ILogger<Worker> logger)
{
this.containerClass = containerClass;
this.logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
this.containerClass.ExecuteAsync(stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
this.logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
await Task.Delay(1000, stoppingToken);
}
}
}
上面是IHostBuilder
建worker后调用的,我调用的是Host.Run()
.
我的理解是(我显然需要继续努力!)因为我创建了线程,它应该 运行 完全独立于(而不是阻塞)created/called 它的线程。 .. 换句话说,它应该能够在线程本身内调用 ReceiveAsync 而不会被阻塞。
感谢所有回复的人,最后感谢 Enrico(随时 copy/paste,我会把答案分配给你)代码实际上 运行 OK。
抛出了一个 TimeoutException 异常,但没有被我的代码或 Visual Studio.
捕获根据https://docs.microsoft.com/en-us/visualstudio/debugger/managing-exceptions-with-the-debugger?view=vs-2019启用所有 CLR 异常,异常开始被抛出。
然后我在代码中处理了异常,并且能够按照我的设计要求继续进行:
public Task ExecuteAsync(CancellationToken stoppingToken)
{
return Task
.Factory
.StartNew(async () => {
while (stoppingToken.IsCancellationRequested == false)
{
try
{
var ts = TimeSpan.FromSeconds(UpdateFrequencySeconds);
var item = await this.queue.ReceiveAsync(ts);
await ProcessAsync(item);
}
catch (TimeoutException)
{
// this is ok, timer expired
}
catch (Exception e)
{
this.logger.LogError(e.ToString());
}
UpdateCounters();
}
await StopAsync();
},
stoppingToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Default)
.Unwrap();
}
将 Task.Factory.StartNew
与异步委托一起使用会创建嵌套任务:
Task<Task> nestedTask = Task.Factory.StartNew(async () => { //...
您正在等待外部任务,而不是内部任务,因此内部任务变成了即发即弃的任务。可以通过两次使用 await
运算符在一行中等待两个任务:
await await Task.Factory.StartNew(async () => { //...
或者,您可以使用 Unwrap
方法将这两个任务合二为一。
await Task.Factory.StartNew(async () => { /* ... */ }).Unwrap();
...或者更好地使用 Task.Run
方法而不是 Task.Factory.StartNew
,因为前者理解异步委托,并为您解包:
await Task.Run(async () => { //...
如果您对 Task.Factory.StartNew
和 Task.Run
之间的差异感兴趣,您可以阅读一篇内容丰富的文章 here。