将阻塞调用包装为异步以实现更好的线程重用和响应 UI
Wrapping blocking calls to be async for better thread reuse and responsive UI
我有一个 class 负责通过调用遗留 class 来检索产品可用性。这个遗留 class 本身通过进行 BLOCKING 网络调用在内部收集产品数据。
请注意,我无法修改遗留 API 的代码。由于所有产品都是相互独立的,我想在不创建任何不必要的线程的情况下并行收集信息,也不阻塞在调用这个遗留 API 时被阻塞的线程。有了这个背景,这里是我的基本 classes.
class Product
{
public int ID { get; set; }
public int VendorID { get; set; }
public string Name { get; set; }
}
class ProductSearchResult
{
public int ID { get; set; }
public int AvailableQuantity { get; set; }
public DateTime ShipDate { get; set; }
public bool Success { get; set; }
public string Error { get; set; }
}
class ProductProcessor
{
List<Product> products;
private static readonly SemaphoreSlim mutex = new SemaphoreSlim(2);
CancellationTokenSource cts = new CancellationTokenSource();
public ProductProcessor()
{
products = new List<Product>()
{
new Product() { ID = 1, VendorID = 100, Name = "PC" },
new Product() { ID = 2, VendorID = 101, Name = "Tablet" },
new Product() { ID = 3, VendorID = 100, Name = "Laptop" },
new Product() { ID = 4, VendorID = 102, Name = "GPS" },
new Product() { ID = 5, VendorID = 107, Name = "Mars Rover" }
};
}
public async void Start()
{
Task<ProductSearchResult>[] tasks = new Task<ProductSearchResult>[products.Count];
Parallel.For(0, products.Count(), async i =>
{
tasks[i] = RetrieveProductAvailablity(products[i].ID, cts.Token);
});
Task<ProductSearchResult> results = await Task.WhenAny(tasks);
// Logic for waiting on indiviaul tasks and reporting results
}
private async Task<ProductSearchResult> RetrieveProductAvailablity(int productId, CancellationToken cancellationToken)
{
ProductSearchResult result = new ProductSearchResult();
result.ID = productId;
if (cancellationToken.IsCancellationRequested)
{
result.Success = false;
result.Error = "Cancelled.";
return result;
}
try
{
await mutex.WaitAsync();
if (cancellationToken.IsCancellationRequested)
{
result.Success = false;
result.Error = "Cancelled.";
return result;
}
LegacyApp app = new LegacyApp();
bool success = await Task.Run(() => app.RetrieveProductAvailability(productId));
if (success)
{
result.Success = success;
result.AvailableQuantity = app.AvailableQuantity;
result.ShipDate = app.ShipDate;
}
else
{
result.Success = false;
result.Error = app.Error;
}
}
finally
{
mutex.Release();
}
return result;
}
}
鉴于我正在尝试将异步包装在同步 API 上,我有两个问题。
- 通过使用 Parallel.For 并将 Legay API 调用包装在 Task.Run 中,我是否创建了任何不必要的线程,这些线程本可以避免,因为我们将使用它UI.
中的代码
- 这段代码看起来仍然是线程安全的。
am I creating any unnecessary threads that could have been avoided
without blocking calling thread as we will use this code in UI.
是的。您的代码通过 Parallel.ForEach
旋转新线程,然后在 RetrieveProductAvailablity
内部再次旋转。没有那个必要。
async-await
和 Parallel.ForEach
don't really play nice together,因为它将您的异步 lambda 转换为 async void
方法而不是 async Task
.
我建议放弃 Parallel.ForEach
和包装的同步调用并执行以下操作:
将您的方法调用从异步更改为同步(因为它实际上根本不是异步):
private ProductSearchResult RetrieveProductAvailablity(int productId,
CancellationToken
cancellationToken)
而不是这个:
bool success = await Task.Run(() => app.RetrieveProductAvailability(productId));
同步调用方法调用:
bool success = app.RetrieveProductAvailability(productId));
然后对所有这些显式调用 Task.Run
:
var productTasks = products.Select(product => Task.Run(() =>
RetrieveProductAvailablity(product.ID, cts.Token))
await Task.WhenAll(productTasks);
编译器将向您发出有关 async
lambda 的警告。仔细阅读;它告诉你它不是异步的。在那里使用 async
没有意义。另外,不要使用 async void
.
由于您的底层 API 正在阻塞 - 并且无法更改它 - 异步代码不是一个选项。我建议使用多个 Task.Run
调用 或 Parallel.For
,但不要同时使用两者。所以让我们使用并行。实际上,让我们使用并行 LINQ,因为您要转换 一个序列。
使 RetrieveProductAvailablity
异步是没有意义的;除了节流,它只做阻塞工作,并行方法有更自然的节流支持。这使您的方法看起来像:
private ProductSearchResult RetrieveProductAvailablity(int productId, CancellationToken cancellationToken)
{
... // no mutex code
LegacyApp app = new LegacyApp();
bool success = app.RetrieveProductAvailability(productId);
... // no mutex code
}
然后您可以这样进行并行处理:
public void Start()
{
ProductSearchResult[] results = products.AsParallel().AsOrdered()
.WithCancellation(cts.Token).WithDegreeOfParallelism(2)
.Select(product => RetrieveProductAvailability(product.ID, cts.Token))
.ToArray();
// Logic for waiting on indiviaul tasks and reporting results
}
在您的 UI 线程中,您可以使用 Task.Run
:
调用 方法
async void MyUiEventHandler(...)
{
await Task.Run(() => processor.Start());
}
这可以使您的业务逻辑保持干净(仅 synchronous/parallel 代码),将这项工作移出 UI 线程(使用 Task.Run
)的责任属于 UI层.
更新: 我添加了对 AsOrdered
的调用以确保结果数组与产品序列具有相同的顺序。这可能是必要的,也可能不是必要的,但由于原始代码保留了顺序,所以这段代码现在也是如此。
更新: 由于您需要在每次检索后更新 UI,因此您应该对每个检索使用 Task.Run
而不是 AsParallel
:
public async Task Start()
{
var tasks = products.Select(product =>
ProcessAvailabilityAsync(product.ID, cts.Token));
await Task.WhenAll(tasks);
}
private SemaphoreSlim mutex = new SempahoreSlim(2);
private async Task ProcessAvailabilityAsync(int id, CancellationToken token)
{
await mutex.WaitAsync();
try
{
var result = await RetrieveProductAvailability(id, token);
// Logic for reporting results
}
finally
{
mutex.Release();
}
}
我有一个 class 负责通过调用遗留 class 来检索产品可用性。这个遗留 class 本身通过进行 BLOCKING 网络调用在内部收集产品数据。 请注意,我无法修改遗留 API 的代码。由于所有产品都是相互独立的,我想在不创建任何不必要的线程的情况下并行收集信息,也不阻塞在调用这个遗留 API 时被阻塞的线程。有了这个背景,这里是我的基本 classes.
class Product
{
public int ID { get; set; }
public int VendorID { get; set; }
public string Name { get; set; }
}
class ProductSearchResult
{
public int ID { get; set; }
public int AvailableQuantity { get; set; }
public DateTime ShipDate { get; set; }
public bool Success { get; set; }
public string Error { get; set; }
}
class ProductProcessor
{
List<Product> products;
private static readonly SemaphoreSlim mutex = new SemaphoreSlim(2);
CancellationTokenSource cts = new CancellationTokenSource();
public ProductProcessor()
{
products = new List<Product>()
{
new Product() { ID = 1, VendorID = 100, Name = "PC" },
new Product() { ID = 2, VendorID = 101, Name = "Tablet" },
new Product() { ID = 3, VendorID = 100, Name = "Laptop" },
new Product() { ID = 4, VendorID = 102, Name = "GPS" },
new Product() { ID = 5, VendorID = 107, Name = "Mars Rover" }
};
}
public async void Start()
{
Task<ProductSearchResult>[] tasks = new Task<ProductSearchResult>[products.Count];
Parallel.For(0, products.Count(), async i =>
{
tasks[i] = RetrieveProductAvailablity(products[i].ID, cts.Token);
});
Task<ProductSearchResult> results = await Task.WhenAny(tasks);
// Logic for waiting on indiviaul tasks and reporting results
}
private async Task<ProductSearchResult> RetrieveProductAvailablity(int productId, CancellationToken cancellationToken)
{
ProductSearchResult result = new ProductSearchResult();
result.ID = productId;
if (cancellationToken.IsCancellationRequested)
{
result.Success = false;
result.Error = "Cancelled.";
return result;
}
try
{
await mutex.WaitAsync();
if (cancellationToken.IsCancellationRequested)
{
result.Success = false;
result.Error = "Cancelled.";
return result;
}
LegacyApp app = new LegacyApp();
bool success = await Task.Run(() => app.RetrieveProductAvailability(productId));
if (success)
{
result.Success = success;
result.AvailableQuantity = app.AvailableQuantity;
result.ShipDate = app.ShipDate;
}
else
{
result.Success = false;
result.Error = app.Error;
}
}
finally
{
mutex.Release();
}
return result;
}
}
鉴于我正在尝试将异步包装在同步 API 上,我有两个问题。
- 通过使用 Parallel.For 并将 Legay API 调用包装在 Task.Run 中,我是否创建了任何不必要的线程,这些线程本可以避免,因为我们将使用它UI. 中的代码
- 这段代码看起来仍然是线程安全的。
am I creating any unnecessary threads that could have been avoided without blocking calling thread as we will use this code in UI.
是的。您的代码通过 Parallel.ForEach
旋转新线程,然后在 RetrieveProductAvailablity
内部再次旋转。没有那个必要。
async-await
和 Parallel.ForEach
don't really play nice together,因为它将您的异步 lambda 转换为 async void
方法而不是 async Task
.
我建议放弃 Parallel.ForEach
和包装的同步调用并执行以下操作:
将您的方法调用从异步更改为同步(因为它实际上根本不是异步):
private ProductSearchResult RetrieveProductAvailablity(int productId,
CancellationToken
cancellationToken)
而不是这个:
bool success = await Task.Run(() => app.RetrieveProductAvailability(productId));
同步调用方法调用:
bool success = app.RetrieveProductAvailability(productId));
然后对所有这些显式调用 Task.Run
:
var productTasks = products.Select(product => Task.Run(() =>
RetrieveProductAvailablity(product.ID, cts.Token))
await Task.WhenAll(productTasks);
编译器将向您发出有关 async
lambda 的警告。仔细阅读;它告诉你它不是异步的。在那里使用 async
没有意义。另外,不要使用 async void
.
由于您的底层 API 正在阻塞 - 并且无法更改它 - 异步代码不是一个选项。我建议使用多个 Task.Run
调用 或 Parallel.For
,但不要同时使用两者。所以让我们使用并行。实际上,让我们使用并行 LINQ,因为您要转换 一个序列。
使 RetrieveProductAvailablity
异步是没有意义的;除了节流,它只做阻塞工作,并行方法有更自然的节流支持。这使您的方法看起来像:
private ProductSearchResult RetrieveProductAvailablity(int productId, CancellationToken cancellationToken)
{
... // no mutex code
LegacyApp app = new LegacyApp();
bool success = app.RetrieveProductAvailability(productId);
... // no mutex code
}
然后您可以这样进行并行处理:
public void Start()
{
ProductSearchResult[] results = products.AsParallel().AsOrdered()
.WithCancellation(cts.Token).WithDegreeOfParallelism(2)
.Select(product => RetrieveProductAvailability(product.ID, cts.Token))
.ToArray();
// Logic for waiting on indiviaul tasks and reporting results
}
在您的 UI 线程中,您可以使用 Task.Run
:
async void MyUiEventHandler(...)
{
await Task.Run(() => processor.Start());
}
这可以使您的业务逻辑保持干净(仅 synchronous/parallel 代码),将这项工作移出 UI 线程(使用 Task.Run
)的责任属于 UI层.
更新: 我添加了对 AsOrdered
的调用以确保结果数组与产品序列具有相同的顺序。这可能是必要的,也可能不是必要的,但由于原始代码保留了顺序,所以这段代码现在也是如此。
更新: 由于您需要在每次检索后更新 UI,因此您应该对每个检索使用 Task.Run
而不是 AsParallel
:
public async Task Start()
{
var tasks = products.Select(product =>
ProcessAvailabilityAsync(product.ID, cts.Token));
await Task.WhenAll(tasks);
}
private SemaphoreSlim mutex = new SempahoreSlim(2);
private async Task ProcessAvailabilityAsync(int id, CancellationToken token)
{
await mutex.WaitAsync();
try
{
var result = await RetrieveProductAvailability(id, token);
// Logic for reporting results
}
finally
{
mutex.Release();
}
}