使用 TPL 以通过使用 continueWith 子句来制定连续的操作顺序

Using TPL in order to make sequential order of actions by using continueWith clause

首先我会解释我想做什么。

我有一个 组件 A,它正在使用一个 组件 B

为了两者之间的通信,我需要使用事件。

我的先决条件之一是让 组件 B 运行 异步并以 顺序排列 运行 事件处理程序 他们被召唤了。

此外,我想取消调用管道(当用户要求时)。因此,所有调用的尚未执行的事件处理程序将永远不会执行。

要实现的解决方案是 TPL。我对我正在尝试做的事情进行了 POC:

    static void Main(string[] args)
    {
        var tokenSource = new CancellationTokenSource();
        var token = tokenSource.Token;

        var t = Task.Factory.StartNew(() => DoSomeWork(token));
                            //.ContinueWith((prevTask) => DoSomeWork(token));

        t.ContinueWith((prevTask) => DoSomeWork(token));

        Task.WaitAll(t);

        Console.WriteLine("Finish");

        Console.ReadKey();
    }

    static int id = 1;
    static void DoSomeWork(CancellationToken ct)
    {
        ct.ThrowIfCancellationRequested();

        Thread.Sleep(1000);
        
        Console.WriteLine(id++);
    }

这段代码的输出是:

1

Finish

2

如您所见,它在实际完成之前就完成了。在 Finish.

后显示 2

如果我用这个修改之前的代码,它就可以工作了:

        static void Main(string[] args)
    {
        var tokenSource = new CancellationTokenSource();
        var token = tokenSource.Token;

        var t = Task.Factory.StartNew(() => DoSomeWork(token))
                            .ContinueWith((prevTask) => DoSomeWork(token));

        //t.ContinueWith((prevTask) => DoSomeWork(token));

        Task.WaitAll(t);

        Console.WriteLine("Finish");

        Console.ReadKey();
    }

    static int id = 1;
    static void DoSomeWork(CancellationToken ct)
    {
        ct.ThrowIfCancellationRequested();

        Thread.Sleep(1000);
        
        Console.WriteLine(id++);
    }

这段代码的输出是:

1

2

Finish

如您所知,我不需要在任务声明中使用 continueWith 语句,而是在引发事件时使用。

为什么Task.WaitAll(t);第一个样本不起作用?

有人能帮我吗?

在 C# 中进行异步编码的正确方法是使用 await 关键字。

public async Task DoLotsOfWork()
{
    await DoSomeWorkAsync();
    await DoSomeMoreWorkAsync();
    Console.WriteLine("Finish");
}

从控制台应用程序 运行 宁该代码会遇到一些问题,所以我建议您使用@StephenCleary 的 Task.AsyncEx 库。

https://www.nuget.org/packages/Nito.AsyncEx/

你这样用

public void Main()
{
    AsyncContext.Run(DoLotsOfWork);
}

更进一步。使用 Task.Run(或更糟的是 Task.Factory.StartNew)方法的理由很少。这些 运行 您的方法在后台作为 Threadpool 的一部分工作。

例如

private static async Task DoSomeWorkAsync(CancellationToken ct)
{
    await Task.Delay(TimeSpan.FromMilliseconds(1000), ct);
    Console.WriteLine(id++);
}

这不会 运行 在任何线程上(因此不会阻塞任何线程)。而是创建了一个 timer/callback 来使主线程在 1000 毫秒后 return 到第二行

编辑:要动态执行此操作也非常简单

public async Task DoLotsOfWork(IEnumerable<Func<Task>> tasks)
{
    foreach(var task in tasks)
        await task();
    Console.WriteLine("Finished");
}

但是,如果您询问使用糟糕的 EAP 模式的方法,我建议您使用 Rx 的 Observable.FromEventPattern 辅助函数。

public async Task SendEmail(MailMessage message)
{
    using(var smtp = new SmtpClient())
    {
        smtp.SendAsync(message);
        await Observable.FromEventPattern<>(x => smtp.SendCompleted +=x, x => smtp.SendCompleted -=x)
                  .ToTask()
    }
}

进一步编辑:

public class Publisher
{
    public IObservable<CancelationToken> SomeEvent {get;}
}

public abstract class Subscriber
{
    public abstract IObservable<CancelationToken> Subscribe(IObservable<CancelationToken> observable);

}

IEnumerable<Subscriber> subscribers = ...
Publisher publisher = ...

IDisposable subscription = subscribers.Aggregate(publisher.SomeEvent, (e, sub) => sub.Subscribe(e)).Subscribe();

//Dispose the subscription when you want to stop listening.

最初的问题是您正在创建两个任务,但只在等待一个任务。

// t is the "first" task
var t = Task.Factory.StartNew(() => DoSomeWork(token));
// the continuation task is not assigned
t.ContinueWith((prevTask) => DoSomeWork(token));
Task.WaitAll(t); // <-- wait only on "t", which is the first task
Console.WriteLine("Finish"); // when the first task finishes, this gets printed
// now the continuation task is executing, but you are not waiting for it

第二个片段发生的事情是您正在等待继续任务,因此它将等到它完成

// t is now the continuation task
var t = Task.Factory.StartNew(() => DoSomeWork(token))
             .ContinueWith((prevTask) => DoSomeWork(token));
Task.WaitAll(t); // <-- wait till the continuation task has finished

所以,第二种方法是可以的,但是如果你想要更精细的控制,只需分配一个任务变量来等待继续任务:

// t is the "first" task
var t = Task.Factory.StartNew(() => DoSomeWork(token));
// The continuation task is assigned to "t2"
var t2 = t.ContinueWith((prevTask) => DoSomeWork(token));
Task.WaitAll(new [] { t, t2 } ); // <-- wait for all tasks
Console.WriteLine("Finish");

注意:我已经按照您的示例代码进行操作,但是 WaitAll 没有将单个任务作为参数(它需要一组任务), 所以这可能无法编译。您可以使用 Task.Wait 或将数组传递给 WaitAll