我怎样才能在 returns 之前延迟我的任务?
How can I delay my task before it returns?
我对 acceptor.IsStarted.Should().BeTrue();
的断言(请参阅下面的单元测试)总是失败,因为它被评估得太早了。立即调用 await task
returns,没有给 this.acceptor.Start()
足够的时间启动。
我想让 FixAcceptor()
的启动更加确定性,因此引入了参数 TimeSpan startupDelay
。
但是我根本不知道在哪里以及如何延迟启动。
在 this.acceptor.Start()
和 this.IsStarted = true
之间放置一个额外的 Thread.Sleep(startupDelay)
不会有帮助,因为它只会阻塞工作任务本身,而不是调用线程。
我希望清楚我想要存档的内容以及我正在努力解决的问题。提前致谢。
public class FixAcceptor
{
// Type provided by QuickFix.net
private readonly ThreadedSocketAcceptor acceptor;
public FixAcceptor(IFixSettings settings)
{
// Shortened
}
public bool IsStarted { get; private set; }
public async void Run(CancellationToken cancellationToken, TimeSpan startupDelay)
{
var task = Task.Run(() =>
{
cancellationToken.ThrowIfCancellationRequested();
this.acceptor.Start();
this.IsStarted = true;
while (true)
{
// Stop if token has been canceled
if (cancellationToken.IsCancellationRequested)
{
this.acceptor.Stop();
this.IsStarted = false;
cancellationToken.ThrowIfCancellationRequested();
}
// Save some CPU cycles
Thread.Sleep(TimeSpan.FromSeconds(1));
}
}, cancellationToken);
try
{
await task;
}
catch (OperationCanceledException e)
{
Debug.WriteLine(e.Message);
}
}
}
以及对应的消费代码
[Fact]
public void Should_Run_Acceptor_And_Stop_By_CancelationToken()
{
// Arrange
var acceptor = new FixAcceptor(new FixAcceptorSettings("test_acceptor.cfg", this.logger));
var tokenSource = new CancellationTokenSource();
// Act
tokenSource.CancelAfter(TimeSpan.FromSeconds(10));
acceptor.Run(tokenSource.Token, TimeSpan.FromSeconds(3));
// Assert
acceptor.IsStarted.Should().BeTrue();
IsListeningOnTcpPort(9823).Should().BeTrue();
// Wait for cancel event to occur
Thread.Sleep(TimeSpan.FromSeconds(15));
acceptor.IsStarted.Should().BeFalse();
}
我建议您构建您的 FixAcceptor.Run()
方法有点不同
public async Task Run(CancellationToken cancellationToken, TimeSpan startupDelay)
{
var task = Task.Run(async () =>
{
try
{
cancellationToken.ThrowIfCancellationRequested();
this.acceptor.Start();
this.IsStarted = true;
while (true)
{
// Stop if token has been canceled
if (cancellationToken.IsCancellationRequested)
{
this.acceptor.Stop();
this.IsStarted = false;
cancellationToken.ThrowIfCancellationRequested();
}
// Save some CPU cycles
await Task.Delay(TimeSpan.FromSeconds(1));
}
}
catch (OperationCanceledException e)
{
Debut.WriteLine(e.Message);
}
}, cancellationToken);
await Task.Delay(startupDelay);
}
所以异常处理在内部任务中,Run
方法 returns 一个 Task
在 startupDelay
之后完成。
(我还把 Thread.Sleep()
换成了 Task.Delay()
)
然后在测试方法中,您可以等待 Run
返回的 Task
[Fact]
public async Task Should_Run_Acceptor_And_Stop_By_CancelationToken()
{
// Arrange
var acceptor = new FixAcceptor(new FixAcceptorSettings("test_acceptor.cfg", this.logger));
var tokenSource = new CancellationTokenSource();
// Act
tokenSource.CancelAfter(TimeSpan.FromSeconds(10));
await acceptor.Run(tokenSource.Token, TimeSpan.FromSeconds(3));
// Assert
acceptor.IsStarted.Should().BeTrue();
IsListeningOnTcpPort(9823).Should().BeTrue();
// Wait for cancel event to occur
Thread.Sleep(TimeSpan.FromSeconds(15));
acceptor.IsStarted.Should().BeFalse();
}
制作方法应该没问题async
(它像你使用 xunit 一样接缝)
不推荐通过添加时间延迟来实现确定性。您可以通过使用 TaskCompletionSource
控制任务在恰当的时刻完成来实现 100% 的确定性:
public Task<bool> Start(CancellationToken cancellationToken)
{
var startTcs = new TaskCompletionSource<bool>();
var task = Task.Run(() =>
{
cancellationToken.ThrowIfCancellationRequested();
this.acceptor.Start();
this.IsStarted = true;
startTcs.TrySetResult(true); // Signal that the starting phase is completed
while (true)
{
// ...
}
}, cancellationToken);
HandleTaskCompletion();
return startTcs.Task;
async void HandleTaskCompletion() // async void method = should never throw
{
try
{
await task;
}
catch (OperationCanceledException ex)
{
Debug.WriteLine(ex.Message);
startTcs.TrySetResult(false); // Signal that start failed
}
catch
{
startTcs.TrySetResult(false); // Signal that start failed
}
}
}
然后在你的测试中替换这一行:
acceptor.Run(tokenSource.Token, TimeSpan.FromSeconds(3));
...有了这个:
bool startResult = await acceptor.Start(tokenSource.Token);
引起我注意的另一个问题是 bool IsStarted
属性,它从一个线程变异并被另一个线程观察到,没有同步。这不是一个真正的问题,因为您可以依赖在每个 await
上自动插入的未记录的内存屏障,并且非常有信心您不会遇到可见性问题,但是如果您想要更加确定,您可以使用 lock
(最稳健)同步访问,或使用 volatile
私有字段备份 属性,如下所示:
private volatile bool _isStarted;
public bool IsStarted => _isStarted;
我对 acceptor.IsStarted.Should().BeTrue();
的断言(请参阅下面的单元测试)总是失败,因为它被评估得太早了。立即调用 await task
returns,没有给 this.acceptor.Start()
足够的时间启动。
我想让 FixAcceptor()
的启动更加确定性,因此引入了参数 TimeSpan startupDelay
。
但是我根本不知道在哪里以及如何延迟启动。
在 this.acceptor.Start()
和 this.IsStarted = true
之间放置一个额外的 Thread.Sleep(startupDelay)
不会有帮助,因为它只会阻塞工作任务本身,而不是调用线程。
我希望清楚我想要存档的内容以及我正在努力解决的问题。提前致谢。
public class FixAcceptor
{
// Type provided by QuickFix.net
private readonly ThreadedSocketAcceptor acceptor;
public FixAcceptor(IFixSettings settings)
{
// Shortened
}
public bool IsStarted { get; private set; }
public async void Run(CancellationToken cancellationToken, TimeSpan startupDelay)
{
var task = Task.Run(() =>
{
cancellationToken.ThrowIfCancellationRequested();
this.acceptor.Start();
this.IsStarted = true;
while (true)
{
// Stop if token has been canceled
if (cancellationToken.IsCancellationRequested)
{
this.acceptor.Stop();
this.IsStarted = false;
cancellationToken.ThrowIfCancellationRequested();
}
// Save some CPU cycles
Thread.Sleep(TimeSpan.FromSeconds(1));
}
}, cancellationToken);
try
{
await task;
}
catch (OperationCanceledException e)
{
Debug.WriteLine(e.Message);
}
}
}
以及对应的消费代码
[Fact]
public void Should_Run_Acceptor_And_Stop_By_CancelationToken()
{
// Arrange
var acceptor = new FixAcceptor(new FixAcceptorSettings("test_acceptor.cfg", this.logger));
var tokenSource = new CancellationTokenSource();
// Act
tokenSource.CancelAfter(TimeSpan.FromSeconds(10));
acceptor.Run(tokenSource.Token, TimeSpan.FromSeconds(3));
// Assert
acceptor.IsStarted.Should().BeTrue();
IsListeningOnTcpPort(9823).Should().BeTrue();
// Wait for cancel event to occur
Thread.Sleep(TimeSpan.FromSeconds(15));
acceptor.IsStarted.Should().BeFalse();
}
我建议您构建您的 FixAcceptor.Run()
方法有点不同
public async Task Run(CancellationToken cancellationToken, TimeSpan startupDelay)
{
var task = Task.Run(async () =>
{
try
{
cancellationToken.ThrowIfCancellationRequested();
this.acceptor.Start();
this.IsStarted = true;
while (true)
{
// Stop if token has been canceled
if (cancellationToken.IsCancellationRequested)
{
this.acceptor.Stop();
this.IsStarted = false;
cancellationToken.ThrowIfCancellationRequested();
}
// Save some CPU cycles
await Task.Delay(TimeSpan.FromSeconds(1));
}
}
catch (OperationCanceledException e)
{
Debut.WriteLine(e.Message);
}
}, cancellationToken);
await Task.Delay(startupDelay);
}
所以异常处理在内部任务中,Run
方法 returns 一个 Task
在 startupDelay
之后完成。
(我还把 Thread.Sleep()
换成了 Task.Delay()
)
然后在测试方法中,您可以等待 Run
Task
[Fact]
public async Task Should_Run_Acceptor_And_Stop_By_CancelationToken()
{
// Arrange
var acceptor = new FixAcceptor(new FixAcceptorSettings("test_acceptor.cfg", this.logger));
var tokenSource = new CancellationTokenSource();
// Act
tokenSource.CancelAfter(TimeSpan.FromSeconds(10));
await acceptor.Run(tokenSource.Token, TimeSpan.FromSeconds(3));
// Assert
acceptor.IsStarted.Should().BeTrue();
IsListeningOnTcpPort(9823).Should().BeTrue();
// Wait for cancel event to occur
Thread.Sleep(TimeSpan.FromSeconds(15));
acceptor.IsStarted.Should().BeFalse();
}
制作方法应该没问题async
(它像你使用 xunit 一样接缝)
不推荐通过添加时间延迟来实现确定性。您可以通过使用 TaskCompletionSource
控制任务在恰当的时刻完成来实现 100% 的确定性:
public Task<bool> Start(CancellationToken cancellationToken)
{
var startTcs = new TaskCompletionSource<bool>();
var task = Task.Run(() =>
{
cancellationToken.ThrowIfCancellationRequested();
this.acceptor.Start();
this.IsStarted = true;
startTcs.TrySetResult(true); // Signal that the starting phase is completed
while (true)
{
// ...
}
}, cancellationToken);
HandleTaskCompletion();
return startTcs.Task;
async void HandleTaskCompletion() // async void method = should never throw
{
try
{
await task;
}
catch (OperationCanceledException ex)
{
Debug.WriteLine(ex.Message);
startTcs.TrySetResult(false); // Signal that start failed
}
catch
{
startTcs.TrySetResult(false); // Signal that start failed
}
}
}
然后在你的测试中替换这一行:
acceptor.Run(tokenSource.Token, TimeSpan.FromSeconds(3));
...有了这个:
bool startResult = await acceptor.Start(tokenSource.Token);
引起我注意的另一个问题是 bool IsStarted
属性,它从一个线程变异并被另一个线程观察到,没有同步。这不是一个真正的问题,因为您可以依赖在每个 await
上自动插入的未记录的内存屏障,并且非常有信心您不会遇到可见性问题,但是如果您想要更加确定,您可以使用 lock
(最稳健)同步访问,或使用 volatile
私有字段备份 属性,如下所示:
private volatile bool _isStarted;
public bool IsStarted => _isStarted;