如何避免在 ChannelReader.WaitToReadAsync 时抛出 InvalidOperationException?
How to avoid InvalidOperationException was thrown when ChannelReader.WaitToReadAsync?
我使用 System.Threading.Channels 编写了异步队列。
但是当我 运行 测试程序时,在 运行dom 时间抛出以下异常并且工作线程停止。
System.InvalidOperationException: The asynchronous operation has not completed.
at System.Threading.Channels.AsyncOperation.ThrowIncompleteOperationException()
at System.Threading.Channels.AsyncOperation`1.GetResult(Int16 token)
at AsyncChannels.Worker() in g:\src\gitrepos\dotnet-sandbox\channelstest\AsyncChannelsTest.cs:line 26
如果捕获并忽略了异常,则代码可以正常工作。
但是我想摆脱原因不明的错误。
这是我的环境和最少的代码。
- TargetFramework = netcoreapp2.1
- System.Threading.Channels版本=4.5.0
using System.Threading.Channels;
using System.Threading;
using System.Threading.Tasks;
using System;
using System.Linq;
class AsyncChannels : IDisposable
{
Channel<TaskCompletionSource<bool>> _Channel;
Thread _Thread;
CancellationTokenSource _Cancellation;
public AsyncChannels()
{
_Channel = Channel.CreateUnbounded<TaskCompletionSource<bool>>();
_Thread = new Thread(Worker);
_Thread.Start();
_Cancellation = new CancellationTokenSource();
}
private void Worker()
{
while (!_Cancellation.IsCancellationRequested)
{
// System.InvalidOperationException is thrown
if (!_Channel.Reader.WaitToReadAsync(_Cancellation.Token).Result)
{
break;
}
while (_Channel.Reader.TryRead(out var item))
{
item.TrySetResult(true);
}
}
}
public void Dispose()
{
_Cancellation.Cancel();
_Channel.Writer.TryComplete();
_Thread.Join();
}
public Task<bool> Enqueue()
{
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
_Channel.Writer.TryWrite(tcs);
return tcs.Task;
}
public static async Task Test()
{
using (var queue = new AsyncChannels())
{
for (int i = 0; i < 100000; i++)
{
await queue.Enqueue().ConfigureAwait(false);
}
}
}
}
你can't directly block on a ValueTask<T>
喜欢这样:
_Channel.Reader.WaitToReadAsync(_Cancellation.Token).Result
你只能用 ValueTask<T>
做两件事:await
它(只有一次),或者通过调用 AsTask()
将它转换为 Task<T>
。如果你需要做任何复杂的事情,比如 await
不止一次或阻止它,那么你需要使用 AsTask()
.
或者,在这种情况下,只需在标准 Channels 消费模式中使用 await
:
class AsyncChannels : IDisposable
{
Channel<TaskCompletionSource<bool>> _Channel;
Task _Thread;
CancellationTokenSource _Cancellation;
public AsyncChannels()
{
_Channel = Channel.CreateUnbounded<TaskCompletionSource<bool>>();
_Thread = Task.Run(() => WorkerAsync());
_Cancellation = new CancellationTokenSource();
}
private async Task WorkerAsync()
{
try
{
while (await _Channel.Reader.WaitToReadAsync(_Cancellation.Token))
while (_Channel.Reader.TryRead(out var item))
item.TrySetResult(true);
}
catch (OperationCanceledException)
{
}
}
public void Dispose()
{
_Cancellation.Cancel();
_Channel.Writer.TryComplete();
}
...
}
我使用 System.Threading.Channels 编写了异步队列。 但是当我 运行 测试程序时,在 运行dom 时间抛出以下异常并且工作线程停止。
System.InvalidOperationException: The asynchronous operation has not completed.
at System.Threading.Channels.AsyncOperation.ThrowIncompleteOperationException()
at System.Threading.Channels.AsyncOperation`1.GetResult(Int16 token)
at AsyncChannels.Worker() in g:\src\gitrepos\dotnet-sandbox\channelstest\AsyncChannelsTest.cs:line 26
如果捕获并忽略了异常,则代码可以正常工作。 但是我想摆脱原因不明的错误。
这是我的环境和最少的代码。
- TargetFramework = netcoreapp2.1
- System.Threading.Channels版本=4.5.0
using System.Threading.Channels;
using System.Threading;
using System.Threading.Tasks;
using System;
using System.Linq;
class AsyncChannels : IDisposable
{
Channel<TaskCompletionSource<bool>> _Channel;
Thread _Thread;
CancellationTokenSource _Cancellation;
public AsyncChannels()
{
_Channel = Channel.CreateUnbounded<TaskCompletionSource<bool>>();
_Thread = new Thread(Worker);
_Thread.Start();
_Cancellation = new CancellationTokenSource();
}
private void Worker()
{
while (!_Cancellation.IsCancellationRequested)
{
// System.InvalidOperationException is thrown
if (!_Channel.Reader.WaitToReadAsync(_Cancellation.Token).Result)
{
break;
}
while (_Channel.Reader.TryRead(out var item))
{
item.TrySetResult(true);
}
}
}
public void Dispose()
{
_Cancellation.Cancel();
_Channel.Writer.TryComplete();
_Thread.Join();
}
public Task<bool> Enqueue()
{
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
_Channel.Writer.TryWrite(tcs);
return tcs.Task;
}
public static async Task Test()
{
using (var queue = new AsyncChannels())
{
for (int i = 0; i < 100000; i++)
{
await queue.Enqueue().ConfigureAwait(false);
}
}
}
}
你can't directly block on a ValueTask<T>
喜欢这样:
_Channel.Reader.WaitToReadAsync(_Cancellation.Token).Result
你只能用 ValueTask<T>
做两件事:await
它(只有一次),或者通过调用 AsTask()
将它转换为 Task<T>
。如果你需要做任何复杂的事情,比如 await
不止一次或阻止它,那么你需要使用 AsTask()
.
或者,在这种情况下,只需在标准 Channels 消费模式中使用 await
:
class AsyncChannels : IDisposable
{
Channel<TaskCompletionSource<bool>> _Channel;
Task _Thread;
CancellationTokenSource _Cancellation;
public AsyncChannels()
{
_Channel = Channel.CreateUnbounded<TaskCompletionSource<bool>>();
_Thread = Task.Run(() => WorkerAsync());
_Cancellation = new CancellationTokenSource();
}
private async Task WorkerAsync()
{
try
{
while (await _Channel.Reader.WaitToReadAsync(_Cancellation.Token))
while (_Channel.Reader.TryRead(out var item))
item.TrySetResult(true);
}
catch (OperationCanceledException)
{
}
}
public void Dispose()
{
_Cancellation.Cancel();
_Channel.Writer.TryComplete();
}
...
}