我怎样才能在 returns 之前延迟我的任务?

How can I delay my task before it returns?

我对 acceptor.IsStarted.Should().BeTrue(); 的断言(请参阅下面的单元测试)总是失败,因为它被评估得太早了。立即调用 await task returns,没有给 this.acceptor.Start() 足够的时间启动。

我想让 FixAcceptor() 的启动更加确定性,因此引入了参数 TimeSpan startupDelay

但是我根本不知道在哪里以及如何延迟启动。

this.acceptor.Start()this.IsStarted = true 之间放置一个额外的 Thread.Sleep(startupDelay) 不会有帮助,因为它只会阻塞工作任务本身,而不是调用线程。

我希望清楚我想要存档的内容以及我正在努力解决的问题。提前致谢。

public class FixAcceptor
{
    // Type provided by QuickFix.net
    private readonly ThreadedSocketAcceptor acceptor;

    public FixAcceptor(IFixSettings settings)
    {
        // Shortened
    }

    public bool IsStarted { get; private set; }

    public async void Run(CancellationToken cancellationToken, TimeSpan startupDelay)
    {
        var task = Task.Run(() =>
        {
            cancellationToken.ThrowIfCancellationRequested();

            this.acceptor.Start();
            this.IsStarted = true;

            while (true)
            {
                // Stop if token has been canceled
                if (cancellationToken.IsCancellationRequested)
                {
                    this.acceptor.Stop();
                    this.IsStarted = false;

                    cancellationToken.ThrowIfCancellationRequested();
                }

                // Save some CPU cycles
                Thread.Sleep(TimeSpan.FromSeconds(1));
            }

        }, cancellationToken);

        try
        {
            await task;
        }
        catch (OperationCanceledException e)
        {
            Debug.WriteLine(e.Message);
        }
    }
}

以及对应的消费代码

[Fact]
public void Should_Run_Acceptor_And_Stop_By_CancelationToken()
{
    // Arrange
    var acceptor = new FixAcceptor(new FixAcceptorSettings("test_acceptor.cfg", this.logger));
    var tokenSource = new CancellationTokenSource();

    // Act
    tokenSource.CancelAfter(TimeSpan.FromSeconds(10));
    acceptor.Run(tokenSource.Token, TimeSpan.FromSeconds(3));

    // Assert
    acceptor.IsStarted.Should().BeTrue();
    IsListeningOnTcpPort(9823).Should().BeTrue();

    // Wait for cancel event to occur
    Thread.Sleep(TimeSpan.FromSeconds(15));
    acceptor.IsStarted.Should().BeFalse();
}

我建议您构建您的 FixAcceptor.Run() 方法有点不同

public async Task Run(CancellationToken cancellationToken, TimeSpan startupDelay)
{
    var task = Task.Run(async () =>
    {
        try 
        {
            cancellationToken.ThrowIfCancellationRequested();

            this.acceptor.Start();
            this.IsStarted = true;

            while (true)
            {
                // Stop if token has been canceled
                if (cancellationToken.IsCancellationRequested)
                {
                    this.acceptor.Stop();
                    this.IsStarted = false;

                    cancellationToken.ThrowIfCancellationRequested();
                }

                // Save some CPU cycles
                await Task.Delay(TimeSpan.FromSeconds(1));
            }
        }
        catch (OperationCanceledException e)
        {
            Debut.WriteLine(e.Message);
        }
    }, cancellationToken);

    await Task.Delay(startupDelay);
}

所以异常处理在内部任务中,Run 方法 returns 一个 TaskstartupDelay 之后完成。 (我还把 Thread.Sleep() 换成了 Task.Delay()) 然后在测试方法中,您可以等待 Run

返回的 Task
[Fact]
public async Task Should_Run_Acceptor_And_Stop_By_CancelationToken()
{
    // Arrange
    var acceptor = new FixAcceptor(new FixAcceptorSettings("test_acceptor.cfg", this.logger));
    var tokenSource = new CancellationTokenSource();

    // Act
    tokenSource.CancelAfter(TimeSpan.FromSeconds(10));
    await acceptor.Run(tokenSource.Token, TimeSpan.FromSeconds(3));

    // Assert
    acceptor.IsStarted.Should().BeTrue();
    IsListeningOnTcpPort(9823).Should().BeTrue();

    // Wait for cancel event to occur
    Thread.Sleep(TimeSpan.FromSeconds(15));
    acceptor.IsStarted.Should().BeFalse();
}

制作方法应该没问题async(它像你使用 xunit 一样接缝)

不推荐通过添加时间延迟来实现确定性。您可以通过使用 TaskCompletionSource 控制任务在恰当的时刻完成来实现 100% 的确定性:

public Task<bool> Start(CancellationToken cancellationToken)
{
    var startTcs = new TaskCompletionSource<bool>();
    var task = Task.Run(() =>
    {
        cancellationToken.ThrowIfCancellationRequested();

        this.acceptor.Start();
        this.IsStarted = true;
        startTcs.TrySetResult(true); // Signal that the starting phase is completed

        while (true)
        {
            // ...
        }

    }, cancellationToken);
    HandleTaskCompletion();
    return startTcs.Task;

    async void HandleTaskCompletion() // async void method = should never throw
    {
        try
        {
            await task;
        }
        catch (OperationCanceledException ex)
        {
            Debug.WriteLine(ex.Message);
            startTcs.TrySetResult(false); // Signal that start failed
        }
        catch
        {
            startTcs.TrySetResult(false); // Signal that start failed
        }
    }
}

然后在你的测试中替换这一行:

acceptor.Run(tokenSource.Token, TimeSpan.FromSeconds(3));

...有了这个:

bool startResult = await acceptor.Start(tokenSource.Token);

引起我注意的另一个问题是 bool IsStarted 属性,它从一个线程变异并被另一个线程观察到,没有同步。这不是一个真正的问题,因为您可以依赖在每个 await 上自动插入的未记录的内存屏障,并且非常有信心您不会遇到可见性问题,但是如果您想要更加确定,您可以使用 lock(最稳健)同步访问,或使用 volatile 私有字段备份 属性,如下所示:

private volatile bool _isStarted;
public bool IsStarted => _isStarted;