在进行长 运行 操作之前立即取消操作?
Cancel operation right away before going through long running operations?
我按照以下方式使用 AsParallel 结合 WithDegreeOfParallelism 和 WithCancellation
AsParallel().WithCancellation(cs.Token).WithDegreeOfParallelism(2)
这是我对此的理解。一次只会处理两个传入序列。一旦其中一个请求完成,将处理更多项目。但是,如果取消
请求被启动,而不是来自传入队列的那些尚未被拾取的项目将被处理。基于这种理解,我创建了以下代码。
class Employee
{
public int ID { get; set;}
public string FirstName { get; set;}
public string LastName { get; set;}
}
class Program
{
private static List<Employee> _Employees;
static CancellationTokenSource cs = new CancellationTokenSource();
static Random rand = new Random();
static void Main(string[] args)
{
_Employees = new List<Employee>()
{
new Employee() { ID = 1, FirstName = "John", LastName = "Doe" },
new Employee() { ID = 2, FirstName = "Peter", LastName = "Saul" },
new Employee() { ID = 3, FirstName = "Mike", LastName = "Sue" },
new Employee() { ID = 4, FirstName = "Catherina", LastName = "Desoza" },
new Employee() { ID = 5, FirstName = "Paul", LastName = "Smith" },
new Employee() { ID = 6, FirstName = "Paul2", LastName = "Smith" },
new Employee() { ID = 7, FirstName = "Paul3", LastName = "Smith" },
new Employee() { ID = 8, FirstName = "Paul4", LastName = "Smith" },
new Employee() { ID = 9, FirstName = "Paul5", LastName = "Smith" },
new Employee() { ID = 10, FirstName = "Paul6", LastName = "Smith" },
new Employee() { ID = 5, FirstName = "Paul7", LastName = "Smith" }
};
try
{
var tasks = _Employees.AsParallel().WithCancellation(cs.Token).WithDegreeOfParallelism(2).Select(x => ProcessThisEmployee(x, cs.Token)).ToArray();
Console.WriteLine("Now waiting");
Thread.Sleep(1000);
cs.Cancel();
Task.WaitAll(tasks);
}
catch (AggregateException ae)
{
// error handling code
Console.WriteLine("something bad happened");
}
catch (Exception ex)
{
// error handling code
Console.WriteLine("something even worst happened");
}
// other stuff
Console.WriteLine("All Done");
}
private static async Task ProcessThisEmployee(Employee x, CancellationToken token)
{
if (token.IsCancellationRequested)
{
Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} -> Cancelled", System.Threading.Thread.CurrentThread.ManagedThreadId));
return;
}
int Sleep = rand.Next(800, 2000);
Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} -> Sleeping for {2}", System.Threading.Thread.CurrentThread.ManagedThreadId, x.ID, Sleep));
await TaskEx.Run(() => System.Threading.Thread.Sleep(Sleep));
Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} finished", System.Threading.Thread.CurrentThread.ManagedThreadId, x.ID));
}
}
这是我运行时的输出。
ThreadID = 3 -> Employee 1 -> Sleeping for 1058
ThreadID = 1 -> Employee 7 -> Sleeping for 1187
ThreadID = 1 -> Employee 8 -> Sleeping for 1296
ThreadID = 1 -> Employee 9 -> Sleeping for 1614
ThreadID = 1 -> Employee 10 -> Sleeping for 1607
ThreadID = 1 -> Employee 5 -> Sleeping for 1928
ThreadID = 3 -> Employee 2 -> Sleeping for 1487
ThreadID = 3 -> Employee 3 -> Sleeping for 1535
ThreadID = 3 -> Employee 4 -> Sleeping for 1265
ThreadID = 3 -> Employee 5 -> Sleeping for 1248
ThreadID = 3 -> Employee 6 -> Sleeping for 807
Now waiting
ThreadID = 3 -> Employee 6 finished
ThreadID = 4 -> Employee 1 finished
ThreadID = 5 -> Employee 7 finished
ThreadID = 6 -> Employee 8 finished
ThreadID = 3 -> Employee 5 finished
ThreadID = 4 -> Employee 9 finished
ThreadID = 5 -> Employee 10 finished
ThreadID = 6 -> Employee 5 finished
ThreadID = 3 -> Employee 4 finished
ThreadID = 7 -> Employee 2 finished
ThreadID = 8 -> Employee 3 finished
All Done
这是我的问题(根据我对事物的理解)。
- 我原以为对于某些员工 ProcessThisEmployee 根本不会被调用,因为它会被取消,但它会为所有员工调用
即使调用 ProcessThisEmployee 方法,它也会通过以下代码路径,这也不会发生
if ( token.IsCancellationRequested )
{
Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} -> Cancelled",System.Threading.Thread.CurrentThread.ManagedThreadId));
return;
}
然后我改变了ProcessThisEmployee,基本上把token.IsCancellationRequested消息移动到Sleep之后如下。
private static async Task ProcessThisEmployee(Employee x, CancellationToken token)
{
int Sleep = rand.Next(800, 2000);
Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} -> Sleeping for {2}", System.Threading.Thread.CurrentThread.ManagedThreadId, x.ID, Sleep));
await TaskEx.Run(() => System.Threading.Thread.Sleep(Sleep));
if (token.IsCancellationRequested)
{
Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} -> Cancelled", System.Threading.Thread.CurrentThread.ManagedThreadId));
return;
}
Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} finished", System.Threading.Thread.CurrentThread.ManagedThreadId, x.ID));
}
现在我得到以下输出。
ThreadID = 3 -> Employee 1 -> Sleeping for 1330
ThreadID = 1 -> Employee 7 -> Sleeping for 1868
ThreadID = 3 -> Employee 2 -> Sleeping for 903
ThreadID = 3 -> Employee 3 -> Sleeping for 1241
ThreadID = 3 -> Employee 4 -> Sleeping for 1367
ThreadID = 3 -> Employee 5 -> Sleeping for 1007
ThreadID = 3 -> Employee 6 -> Sleeping for 923
ThreadID = 1 -> Employee 8 -> Sleeping for 1032
ThreadID = 1 -> Employee 9 -> Sleeping for 1948
ThreadID = 1 -> Employee 10 -> Sleeping for 1456
ThreadID = 1 -> Employee 5 -> Sleeping for 1737
Now waiting
ThreadID = 5 -> Employee 2 finished
ThreadID = 3 -> Employee 6 finished
something bad happened
All Done
我的问题是我对这个工作流程有什么误解。我基本上想尽快取消操作,而不需要经过长时间的 运行 操作(在这种情况下,睡眠只是一个例子,但它可能非常昂贵)
该代码存在一些问题:
1.) ToArray()
实现序列,即它只会 return 在来自源序列的所有输入都经过 Select(...)
.
之后
因为你之后调用 cs.Cancel()
它不会在 ProcessThisEmployee
开始时立即触发 token.IsCancellationRequested
2.) WithDegreeOfParallelism(2).Select(x => ProcessThisEmployee(x, cs.Token))
看起来不错,但实际上并没有真正按照您的意愿去做,因为 ProcessThisEmployee
是一种异步方法,一旦 return第一个 return 或到达第一个 await.
您可能想要做的是仅以 2 个并行度执行长 运行ning ProcessThisEmployee
方法。你实际上做的是创建一堆只有 2 个并行度的 Tasks
。之后任务本身全部运行并发。
我不知道如何针对您的具体情况解决此问题,因为我不知道上下文。但也许这已经对你有所帮助了。
更新以回复您的评论:我正在执行 ToArray 和 ProcessThisEmployee 是一种异步方法,因为此代码将成为库的一部分并且可以从 WPF 应用程序中使用。最终用户可能想要在 UI 上提供更新,所以我不想在操作完成之前阻止 (john smith)
不要为本质上不是异步的事物编写异步包装器,即主要是文件、网络或数据库访问。如果使用库的开发人员想要在异步上下文中调用某些东西,他仍然可以执行 await Task.Run(...)
。有关这方面的更多信息,您可以查看这篇关于您是否 should expose asynchronous wrappers for synchronous methods.
的文章
在我看来,如果您已经有一个可用的 LINQ 查询并希望加快它的速度,那么 PLINQ 最有用,因为该查询适合并行处理。
在您的情况下,最简单的方法可能是使用 2 个线程的工作队列。我很确定网络上有这样的例子。
我按照以下方式使用 AsParallel 结合 WithDegreeOfParallelism 和 WithCancellation
AsParallel().WithCancellation(cs.Token).WithDegreeOfParallelism(2)
这是我对此的理解。一次只会处理两个传入序列。一旦其中一个请求完成,将处理更多项目。但是,如果取消 请求被启动,而不是来自传入队列的那些尚未被拾取的项目将被处理。基于这种理解,我创建了以下代码。
class Employee
{
public int ID { get; set;}
public string FirstName { get; set;}
public string LastName { get; set;}
}
class Program
{
private static List<Employee> _Employees;
static CancellationTokenSource cs = new CancellationTokenSource();
static Random rand = new Random();
static void Main(string[] args)
{
_Employees = new List<Employee>()
{
new Employee() { ID = 1, FirstName = "John", LastName = "Doe" },
new Employee() { ID = 2, FirstName = "Peter", LastName = "Saul" },
new Employee() { ID = 3, FirstName = "Mike", LastName = "Sue" },
new Employee() { ID = 4, FirstName = "Catherina", LastName = "Desoza" },
new Employee() { ID = 5, FirstName = "Paul", LastName = "Smith" },
new Employee() { ID = 6, FirstName = "Paul2", LastName = "Smith" },
new Employee() { ID = 7, FirstName = "Paul3", LastName = "Smith" },
new Employee() { ID = 8, FirstName = "Paul4", LastName = "Smith" },
new Employee() { ID = 9, FirstName = "Paul5", LastName = "Smith" },
new Employee() { ID = 10, FirstName = "Paul6", LastName = "Smith" },
new Employee() { ID = 5, FirstName = "Paul7", LastName = "Smith" }
};
try
{
var tasks = _Employees.AsParallel().WithCancellation(cs.Token).WithDegreeOfParallelism(2).Select(x => ProcessThisEmployee(x, cs.Token)).ToArray();
Console.WriteLine("Now waiting");
Thread.Sleep(1000);
cs.Cancel();
Task.WaitAll(tasks);
}
catch (AggregateException ae)
{
// error handling code
Console.WriteLine("something bad happened");
}
catch (Exception ex)
{
// error handling code
Console.WriteLine("something even worst happened");
}
// other stuff
Console.WriteLine("All Done");
}
private static async Task ProcessThisEmployee(Employee x, CancellationToken token)
{
if (token.IsCancellationRequested)
{
Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} -> Cancelled", System.Threading.Thread.CurrentThread.ManagedThreadId));
return;
}
int Sleep = rand.Next(800, 2000);
Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} -> Sleeping for {2}", System.Threading.Thread.CurrentThread.ManagedThreadId, x.ID, Sleep));
await TaskEx.Run(() => System.Threading.Thread.Sleep(Sleep));
Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} finished", System.Threading.Thread.CurrentThread.ManagedThreadId, x.ID));
}
}
这是我运行时的输出。
ThreadID = 3 -> Employee 1 -> Sleeping for 1058
ThreadID = 1 -> Employee 7 -> Sleeping for 1187
ThreadID = 1 -> Employee 8 -> Sleeping for 1296
ThreadID = 1 -> Employee 9 -> Sleeping for 1614
ThreadID = 1 -> Employee 10 -> Sleeping for 1607
ThreadID = 1 -> Employee 5 -> Sleeping for 1928
ThreadID = 3 -> Employee 2 -> Sleeping for 1487
ThreadID = 3 -> Employee 3 -> Sleeping for 1535
ThreadID = 3 -> Employee 4 -> Sleeping for 1265
ThreadID = 3 -> Employee 5 -> Sleeping for 1248
ThreadID = 3 -> Employee 6 -> Sleeping for 807
Now waiting
ThreadID = 3 -> Employee 6 finished
ThreadID = 4 -> Employee 1 finished
ThreadID = 5 -> Employee 7 finished
ThreadID = 6 -> Employee 8 finished
ThreadID = 3 -> Employee 5 finished
ThreadID = 4 -> Employee 9 finished
ThreadID = 5 -> Employee 10 finished
ThreadID = 6 -> Employee 5 finished
ThreadID = 3 -> Employee 4 finished
ThreadID = 7 -> Employee 2 finished
ThreadID = 8 -> Employee 3 finished
All Done
这是我的问题(根据我对事物的理解)。
- 我原以为对于某些员工 ProcessThisEmployee 根本不会被调用,因为它会被取消,但它会为所有员工调用
即使调用 ProcessThisEmployee 方法,它也会通过以下代码路径,这也不会发生
if ( token.IsCancellationRequested ) { Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} -> Cancelled",System.Threading.Thread.CurrentThread.ManagedThreadId)); return; }
然后我改变了ProcessThisEmployee,基本上把token.IsCancellationRequested消息移动到Sleep之后如下。
private static async Task ProcessThisEmployee(Employee x, CancellationToken token)
{
int Sleep = rand.Next(800, 2000);
Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} -> Sleeping for {2}", System.Threading.Thread.CurrentThread.ManagedThreadId, x.ID, Sleep));
await TaskEx.Run(() => System.Threading.Thread.Sleep(Sleep));
if (token.IsCancellationRequested)
{
Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} -> Cancelled", System.Threading.Thread.CurrentThread.ManagedThreadId));
return;
}
Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} finished", System.Threading.Thread.CurrentThread.ManagedThreadId, x.ID));
}
现在我得到以下输出。
ThreadID = 3 -> Employee 1 -> Sleeping for 1330
ThreadID = 1 -> Employee 7 -> Sleeping for 1868
ThreadID = 3 -> Employee 2 -> Sleeping for 903
ThreadID = 3 -> Employee 3 -> Sleeping for 1241
ThreadID = 3 -> Employee 4 -> Sleeping for 1367
ThreadID = 3 -> Employee 5 -> Sleeping for 1007
ThreadID = 3 -> Employee 6 -> Sleeping for 923
ThreadID = 1 -> Employee 8 -> Sleeping for 1032
ThreadID = 1 -> Employee 9 -> Sleeping for 1948
ThreadID = 1 -> Employee 10 -> Sleeping for 1456
ThreadID = 1 -> Employee 5 -> Sleeping for 1737
Now waiting
ThreadID = 5 -> Employee 2 finished
ThreadID = 3 -> Employee 6 finished
something bad happened
All Done
我的问题是我对这个工作流程有什么误解。我基本上想尽快取消操作,而不需要经过长时间的 运行 操作(在这种情况下,睡眠只是一个例子,但它可能非常昂贵)
该代码存在一些问题:
1.) ToArray()
实现序列,即它只会 return 在来自源序列的所有输入都经过 Select(...)
.
因为你之后调用 cs.Cancel()
它不会在 ProcessThisEmployee
token.IsCancellationRequested
2.) WithDegreeOfParallelism(2).Select(x => ProcessThisEmployee(x, cs.Token))
看起来不错,但实际上并没有真正按照您的意愿去做,因为 ProcessThisEmployee
是一种异步方法,一旦 return第一个 return 或到达第一个 await.
您可能想要做的是仅以 2 个并行度执行长 运行ning ProcessThisEmployee
方法。你实际上做的是创建一堆只有 2 个并行度的 Tasks
。之后任务本身全部运行并发。
我不知道如何针对您的具体情况解决此问题,因为我不知道上下文。但也许这已经对你有所帮助了。
更新以回复您的评论:我正在执行 ToArray 和 ProcessThisEmployee 是一种异步方法,因为此代码将成为库的一部分并且可以从 WPF 应用程序中使用。最终用户可能想要在 UI 上提供更新,所以我不想在操作完成之前阻止 (john smith)
不要为本质上不是异步的事物编写异步包装器,即主要是文件、网络或数据库访问。如果使用库的开发人员想要在异步上下文中调用某些东西,他仍然可以执行 await Task.Run(...)
。有关这方面的更多信息,您可以查看这篇关于您是否 should expose asynchronous wrappers for synchronous methods.
在我看来,如果您已经有一个可用的 LINQ 查询并希望加快它的速度,那么 PLINQ 最有用,因为该查询适合并行处理。
在您的情况下,最简单的方法可能是使用 2 个线程的工作队列。我很确定网络上有这样的例子。