使用 TPL 以通过使用 continueWith 子句来制定连续的操作顺序
Using TPL in order to make sequential order of actions by using continueWith clause
首先我会解释我想做什么。
我有一个 组件 A,它正在使用一个 组件 B。
为了两者之间的通信,我需要使用事件。
我的先决条件之一是让 组件 B 运行 异步并以 顺序排列 运行 事件处理程序 他们被召唤了。
此外,我想取消调用管道(当用户要求时)。因此,所有调用的尚未执行的事件处理程序将永远不会执行。
要实现的解决方案是 TPL。我对我正在尝试做的事情进行了 POC:
static void Main(string[] args)
{
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
var t = Task.Factory.StartNew(() => DoSomeWork(token));
//.ContinueWith((prevTask) => DoSomeWork(token));
t.ContinueWith((prevTask) => DoSomeWork(token));
Task.WaitAll(t);
Console.WriteLine("Finish");
Console.ReadKey();
}
static int id = 1;
static void DoSomeWork(CancellationToken ct)
{
ct.ThrowIfCancellationRequested();
Thread.Sleep(1000);
Console.WriteLine(id++);
}
这段代码的输出是:
1
Finish
2
如您所见,它在实际完成之前就完成了。在 Finish.
后显示 2
如果我用这个修改之前的代码,它就可以工作了:
static void Main(string[] args)
{
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
var t = Task.Factory.StartNew(() => DoSomeWork(token))
.ContinueWith((prevTask) => DoSomeWork(token));
//t.ContinueWith((prevTask) => DoSomeWork(token));
Task.WaitAll(t);
Console.WriteLine("Finish");
Console.ReadKey();
}
static int id = 1;
static void DoSomeWork(CancellationToken ct)
{
ct.ThrowIfCancellationRequested();
Thread.Sleep(1000);
Console.WriteLine(id++);
}
这段代码的输出是:
1
2
Finish
如您所知,我不需要在任务声明中使用 continueWith 语句,而是在引发事件时使用。
为什么Task.WaitAll(t);第一个样本不起作用?
有人能帮我吗?
在 C# 中进行异步编码的正确方法是使用 await
关键字。
public async Task DoLotsOfWork()
{
await DoSomeWorkAsync();
await DoSomeMoreWorkAsync();
Console.WriteLine("Finish");
}
从控制台应用程序 运行 宁该代码会遇到一些问题,所以我建议您使用@StephenCleary 的 Task.AsyncEx
库。
https://www.nuget.org/packages/Nito.AsyncEx/
你这样用
public void Main()
{
AsyncContext.Run(DoLotsOfWork);
}
更进一步。使用 Task.Run
(或更糟的是 Task.Factory.StartNew
)方法的理由很少。这些 运行 您的方法在后台作为 Threadpool 的一部分工作。
例如
private static async Task DoSomeWorkAsync(CancellationToken ct)
{
await Task.Delay(TimeSpan.FromMilliseconds(1000), ct);
Console.WriteLine(id++);
}
这不会 运行 在任何线程上(因此不会阻塞任何线程)。而是创建了一个 timer/callback 来使主线程在 1000 毫秒后 return 到第二行
编辑:要动态执行此操作也非常简单
public async Task DoLotsOfWork(IEnumerable<Func<Task>> tasks)
{
foreach(var task in tasks)
await task();
Console.WriteLine("Finished");
}
但是,如果您询问使用糟糕的 EAP 模式的方法,我建议您使用 Rx 的 Observable.FromEventPattern
辅助函数。
public async Task SendEmail(MailMessage message)
{
using(var smtp = new SmtpClient())
{
smtp.SendAsync(message);
await Observable.FromEventPattern<>(x => smtp.SendCompleted +=x, x => smtp.SendCompleted -=x)
.ToTask()
}
}
进一步编辑:
public class Publisher
{
public IObservable<CancelationToken> SomeEvent {get;}
}
public abstract class Subscriber
{
public abstract IObservable<CancelationToken> Subscribe(IObservable<CancelationToken> observable);
}
IEnumerable<Subscriber> subscribers = ...
Publisher publisher = ...
IDisposable subscription = subscribers.Aggregate(publisher.SomeEvent, (e, sub) => sub.Subscribe(e)).Subscribe();
//Dispose the subscription when you want to stop listening.
最初的问题是您正在创建两个任务,但只在等待一个任务。
// t is the "first" task
var t = Task.Factory.StartNew(() => DoSomeWork(token));
// the continuation task is not assigned
t.ContinueWith((prevTask) => DoSomeWork(token));
Task.WaitAll(t); // <-- wait only on "t", which is the first task
Console.WriteLine("Finish"); // when the first task finishes, this gets printed
// now the continuation task is executing, but you are not waiting for it
第二个片段发生的事情是您正在等待继续任务,因此它将等到它完成
// t is now the continuation task
var t = Task.Factory.StartNew(() => DoSomeWork(token))
.ContinueWith((prevTask) => DoSomeWork(token));
Task.WaitAll(t); // <-- wait till the continuation task has finished
所以,第二种方法是可以的,但是如果你想要更精细的控制,只需分配一个任务变量来等待继续任务:
// t is the "first" task
var t = Task.Factory.StartNew(() => DoSomeWork(token));
// The continuation task is assigned to "t2"
var t2 = t.ContinueWith((prevTask) => DoSomeWork(token));
Task.WaitAll(new [] { t, t2 } ); // <-- wait for all tasks
Console.WriteLine("Finish");
注意:我已经按照您的示例代码进行操作,但是 WaitAll
没有将单个任务作为参数(它需要一组任务), 所以这可能无法编译。您可以使用 Task.Wait
或将数组传递给 WaitAll
首先我会解释我想做什么。
我有一个 组件 A,它正在使用一个 组件 B。
为了两者之间的通信,我需要使用事件。
我的先决条件之一是让 组件 B 运行 异步并以 顺序排列 运行 事件处理程序 他们被召唤了。
此外,我想取消调用管道(当用户要求时)。因此,所有调用的尚未执行的事件处理程序将永远不会执行。
要实现的解决方案是 TPL。我对我正在尝试做的事情进行了 POC:
static void Main(string[] args)
{
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
var t = Task.Factory.StartNew(() => DoSomeWork(token));
//.ContinueWith((prevTask) => DoSomeWork(token));
t.ContinueWith((prevTask) => DoSomeWork(token));
Task.WaitAll(t);
Console.WriteLine("Finish");
Console.ReadKey();
}
static int id = 1;
static void DoSomeWork(CancellationToken ct)
{
ct.ThrowIfCancellationRequested();
Thread.Sleep(1000);
Console.WriteLine(id++);
}
这段代码的输出是:
1
Finish
2
如您所见,它在实际完成之前就完成了。在 Finish.
后显示 2如果我用这个修改之前的代码,它就可以工作了:
static void Main(string[] args)
{
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
var t = Task.Factory.StartNew(() => DoSomeWork(token))
.ContinueWith((prevTask) => DoSomeWork(token));
//t.ContinueWith((prevTask) => DoSomeWork(token));
Task.WaitAll(t);
Console.WriteLine("Finish");
Console.ReadKey();
}
static int id = 1;
static void DoSomeWork(CancellationToken ct)
{
ct.ThrowIfCancellationRequested();
Thread.Sleep(1000);
Console.WriteLine(id++);
}
这段代码的输出是:
1
2
Finish
如您所知,我不需要在任务声明中使用 continueWith 语句,而是在引发事件时使用。
为什么Task.WaitAll(t);第一个样本不起作用?
有人能帮我吗?
在 C# 中进行异步编码的正确方法是使用 await
关键字。
public async Task DoLotsOfWork()
{
await DoSomeWorkAsync();
await DoSomeMoreWorkAsync();
Console.WriteLine("Finish");
}
从控制台应用程序 运行 宁该代码会遇到一些问题,所以我建议您使用@StephenCleary 的 Task.AsyncEx
库。
https://www.nuget.org/packages/Nito.AsyncEx/
你这样用
public void Main()
{
AsyncContext.Run(DoLotsOfWork);
}
更进一步。使用 Task.Run
(或更糟的是 Task.Factory.StartNew
)方法的理由很少。这些 运行 您的方法在后台作为 Threadpool 的一部分工作。
例如
private static async Task DoSomeWorkAsync(CancellationToken ct)
{
await Task.Delay(TimeSpan.FromMilliseconds(1000), ct);
Console.WriteLine(id++);
}
这不会 运行 在任何线程上(因此不会阻塞任何线程)。而是创建了一个 timer/callback 来使主线程在 1000 毫秒后 return 到第二行
编辑:要动态执行此操作也非常简单
public async Task DoLotsOfWork(IEnumerable<Func<Task>> tasks)
{
foreach(var task in tasks)
await task();
Console.WriteLine("Finished");
}
但是,如果您询问使用糟糕的 EAP 模式的方法,我建议您使用 Rx 的 Observable.FromEventPattern
辅助函数。
public async Task SendEmail(MailMessage message)
{
using(var smtp = new SmtpClient())
{
smtp.SendAsync(message);
await Observable.FromEventPattern<>(x => smtp.SendCompleted +=x, x => smtp.SendCompleted -=x)
.ToTask()
}
}
进一步编辑:
public class Publisher
{
public IObservable<CancelationToken> SomeEvent {get;}
}
public abstract class Subscriber
{
public abstract IObservable<CancelationToken> Subscribe(IObservable<CancelationToken> observable);
}
IEnumerable<Subscriber> subscribers = ...
Publisher publisher = ...
IDisposable subscription = subscribers.Aggregate(publisher.SomeEvent, (e, sub) => sub.Subscribe(e)).Subscribe();
//Dispose the subscription when you want to stop listening.
最初的问题是您正在创建两个任务,但只在等待一个任务。
// t is the "first" task
var t = Task.Factory.StartNew(() => DoSomeWork(token));
// the continuation task is not assigned
t.ContinueWith((prevTask) => DoSomeWork(token));
Task.WaitAll(t); // <-- wait only on "t", which is the first task
Console.WriteLine("Finish"); // when the first task finishes, this gets printed
// now the continuation task is executing, but you are not waiting for it
第二个片段发生的事情是您正在等待继续任务,因此它将等到它完成
// t is now the continuation task
var t = Task.Factory.StartNew(() => DoSomeWork(token))
.ContinueWith((prevTask) => DoSomeWork(token));
Task.WaitAll(t); // <-- wait till the continuation task has finished
所以,第二种方法是可以的,但是如果你想要更精细的控制,只需分配一个任务变量来等待继续任务:
// t is the "first" task
var t = Task.Factory.StartNew(() => DoSomeWork(token));
// The continuation task is assigned to "t2"
var t2 = t.ContinueWith((prevTask) => DoSomeWork(token));
Task.WaitAll(new [] { t, t2 } ); // <-- wait for all tasks
Console.WriteLine("Finish");
注意:我已经按照您的示例代码进行操作,但是 WaitAll
没有将单个任务作为参数(它需要一组任务), 所以这可能无法编译。您可以使用 Task.Wait
或将数组传递给 WaitAll