对大量 HTTP 请求使用任务
Using Tasks for a lot of HTTP requests
所以我在使用 Tasks 处理大量 HTTP 请求时遇到了一些困难。
我想做的是从 WMTS 创建一个大图像。对于那些不知道的人,WMTS 是一种 Web 地图瓦片服务。
所以基本上,您可以通过使用正确的 tileRow 和 tileColumn 发送请求来请求 256x256 的图像块。所以在这种情况下,我正在尝试构建包含数百甚至数千个这些图像块的图像。
为此,我创建了一个应用程序:
- 根据输入计算需要请求哪些瓦片。
- 创建一个列表,我可以使用它向 WMTS 发出正确的 HTTP 请求。
- 将这些请求发送到服务器并检索图像。
- 将图像拼接成一张大图像。这就是我们想要的结果。
正如您想象的那样,图块的数量呈指数级增长。这并没有真正影响 CPU 工作,但主要是 I/O 绑定工作。
因此,与其等待每个请求发送到 return,然后再发送下一个请求,我认为为此使用 Tasks 是个好主意。创建将处理每个单独请求的任务,并在所有任务完成后构建大图像。
所以这是我已经知道我要请求哪些图块的方法。这里我想递归发送带有任务的请求,直到所有数据都完成(最终采用最大重试机制)。
public Dictionary<Tuple<int, int>, Image> GetTilesParallel(List<Tuple<int, int>> tileMatrix, int retry = 0)
{
//The dictionary we will return
Dictionary<Tuple<int, int>, Image> images = new Dictionary<Tuple<int, int>, Image>();
//The dictionary that we will recursively request if tiles fail.
List<Tuple<int, int>> failedTiles = new List<Tuple<int, int>>();
//To track when tasks are finished
List<Task> tasks = new List<Task>();
foreach (var request in tileMatrix)
{
Tuple<int, int> imageTile = new Tuple<int, int>(request.Item1, request.Item2);
var t = Task.Factory.StartNew(() => { return GetTileData(imageTile.Item1, imageTile.Item2); }, TaskCreationOptions.LongRunning).ContinueWith(tsk =>
{
if (tsk.Status == TaskStatus.RanToCompletion)
{
var response = tsk.Result.Result.Content.ReadAsByteArrayAsync().Result;
images.Add(imageTile, Image.FromStream(new MemoryStream(response)));
}
else
{
failedTiles.Add(imageTile);
}
});
tasks.Add(t);
}
Task.WaitAll(tasks.ToArray());
if (failedTiles.Count > 0)
{
Console.WriteLine($"Retrying {failedTiles.Count} requests");
Thread.Sleep(500);
Dictionary<Tuple<int, int>, Image> retriedImages = GetTilesParallel(failedTiles, retry++);
foreach (KeyValuePair<Tuple<int, int>, Image> retriedImage in retriedImages)
{
images.Add(retriedImage.Key, retriedImage.Value);
}
}
return images;
}
这是实际执行 HTTP 请求的方法(我知道这不是最优的或干净的,但我首先尝试让一些东西工作)。
private async Task<HttpResponseMessage> GetTileData(int tileColumn, int tileRow)
{
WMTSSettings settings = Service.Settings;
Dictionary<string, string> requestParams = new Dictionary<string, string>();
requestParams.Add("Request", "GetTile");
requestParams.Add("Style", "Default");
requestParams.Add("Service", "WMTS");
requestParams.Add("Version", this.Service.Version);
requestParams.Add("TileMatrixSet", settings.WMTSTileMatrixSet);
requestParams.Add("TileMatrix", settings.WMTSTileMatrixSet + ":" + settings.WMTSTileMatrix);
requestParams.Add("Format", settings.ImageFormat);
requestParams.Add("Layer", settings.Layer);
requestParams.Add("TileCol", tileColumn.ToString());
requestParams.Add("TileRow", tileRow.ToString());
string requestString = this.Service.BaseUri;
for (int i = 0; i < requestParams.Count; i++)
{
if (i == 0)
{
requestString += "?";
}
requestString += requestParams.ElementAt(i).Key;
requestString += "=";
requestString += requestParams.ElementAt(i).Value;
if (i != requestParams.Count - 1)
{
requestString += "&";
}
}
CancellationTokenSource source = new CancellationTokenSource();
CancellationToken token = source.Token;
Task<HttpResponseMessage> response = HttppClient.GetAsync(requestString, token);
return await response;
}
我目前面临两个问题,为此我尝试了很多方法:
- 在当前设置中,在我的
ContinueWith
任务中,我遇到了一些奇怪的错误,告诉我“对象引用未设置为对象的实例”。即使 ContinueWith
任务中的 response
变量和 imageTile
变量不为空?
- 另一个问题是我仍然收到 TaskCancellationExceptions。但如果我是正确的,这些异常应该被 Continuation 任务捕获?
有人可以为我指明解决这个问题的正确方向吗?或者 Tasks 是否可行?
是的,任务是要走的路,但不是,ContinueWith
is not the way to go. This method is mostly a relic from the pre async-await era, and it's rarely useful nowadays. The same is true for the Task.Factory.StartNew
: you rarely need to use this method after the introduction of the Task.Run
方法。
创建下载切片数据所需的任务的一种简便方法是 LINQ Select
运算符。你可以这样使用它:
async Task<Dictionary<(int, int), Image>> GetAllTileDataAsync(List<(int, int)> tiles)
{
Task<(int, int, Image)>[] tasks = tiles.Select(async tile =>
{
(int tileColumn, int tileRow) = tile;
int retry = 0;
while (true)
{
try
{
using HttpResponseMessage response = await GetTileDataAsync(
tileColumn, tileRow);
response.EnsureSuccessStatusCode();
byte[] bytes = await response.Content.ReadAsByteArrayAsync();
Image image = Image.FromStream(new MemoryStream(bytes));
return (tileColumn, tileRow, image);
}
catch
{
if (retry >= 3) throw;
}
await Task.Delay(1000);
retry++;
}
}).ToArray();
(int, int, Image)[] results = await Task.WhenAll(tasks);
return results.ToDictionary(e => (e.Item1, e.Item2), e => e.Item3);
}
每个图块都投影到 Task<(int, int, Image)>
。任务的结果包含有关图块的所有初始信息和获取信息。这样就无需依赖危险的副作用来构建最终的 Dictionary
.
注意上面代码中没有任何 Task.Factory.StartNew
、.ContinueWith
、.Result
、.Wait()
和 Task.WaitAll
。所有这些方法在支持异步的现代应用程序中都是危险信号。一切都由 async/await 组成。 No threads are created, no threads are blocked,并且您的应用程序达到了最大的可扩展性和响应能力。
所以我在使用 Tasks 处理大量 HTTP 请求时遇到了一些困难。
我想做的是从 WMTS 创建一个大图像。对于那些不知道的人,WMTS 是一种 Web 地图瓦片服务。 所以基本上,您可以通过使用正确的 tileRow 和 tileColumn 发送请求来请求 256x256 的图像块。所以在这种情况下,我正在尝试构建包含数百甚至数千个这些图像块的图像。
为此,我创建了一个应用程序:
- 根据输入计算需要请求哪些瓦片。
- 创建一个列表,我可以使用它向 WMTS 发出正确的 HTTP 请求。
- 将这些请求发送到服务器并检索图像。
- 将图像拼接成一张大图像。这就是我们想要的结果。
正如您想象的那样,图块的数量呈指数级增长。这并没有真正影响 CPU 工作,但主要是 I/O 绑定工作。 因此,与其等待每个请求发送到 return,然后再发送下一个请求,我认为为此使用 Tasks 是个好主意。创建将处理每个单独请求的任务,并在所有任务完成后构建大图像。
所以这是我已经知道我要请求哪些图块的方法。这里我想递归发送带有任务的请求,直到所有数据都完成(最终采用最大重试机制)。
public Dictionary<Tuple<int, int>, Image> GetTilesParallel(List<Tuple<int, int>> tileMatrix, int retry = 0)
{
//The dictionary we will return
Dictionary<Tuple<int, int>, Image> images = new Dictionary<Tuple<int, int>, Image>();
//The dictionary that we will recursively request if tiles fail.
List<Tuple<int, int>> failedTiles = new List<Tuple<int, int>>();
//To track when tasks are finished
List<Task> tasks = new List<Task>();
foreach (var request in tileMatrix)
{
Tuple<int, int> imageTile = new Tuple<int, int>(request.Item1, request.Item2);
var t = Task.Factory.StartNew(() => { return GetTileData(imageTile.Item1, imageTile.Item2); }, TaskCreationOptions.LongRunning).ContinueWith(tsk =>
{
if (tsk.Status == TaskStatus.RanToCompletion)
{
var response = tsk.Result.Result.Content.ReadAsByteArrayAsync().Result;
images.Add(imageTile, Image.FromStream(new MemoryStream(response)));
}
else
{
failedTiles.Add(imageTile);
}
});
tasks.Add(t);
}
Task.WaitAll(tasks.ToArray());
if (failedTiles.Count > 0)
{
Console.WriteLine($"Retrying {failedTiles.Count} requests");
Thread.Sleep(500);
Dictionary<Tuple<int, int>, Image> retriedImages = GetTilesParallel(failedTiles, retry++);
foreach (KeyValuePair<Tuple<int, int>, Image> retriedImage in retriedImages)
{
images.Add(retriedImage.Key, retriedImage.Value);
}
}
return images;
}
这是实际执行 HTTP 请求的方法(我知道这不是最优的或干净的,但我首先尝试让一些东西工作)。
private async Task<HttpResponseMessage> GetTileData(int tileColumn, int tileRow)
{
WMTSSettings settings = Service.Settings;
Dictionary<string, string> requestParams = new Dictionary<string, string>();
requestParams.Add("Request", "GetTile");
requestParams.Add("Style", "Default");
requestParams.Add("Service", "WMTS");
requestParams.Add("Version", this.Service.Version);
requestParams.Add("TileMatrixSet", settings.WMTSTileMatrixSet);
requestParams.Add("TileMatrix", settings.WMTSTileMatrixSet + ":" + settings.WMTSTileMatrix);
requestParams.Add("Format", settings.ImageFormat);
requestParams.Add("Layer", settings.Layer);
requestParams.Add("TileCol", tileColumn.ToString());
requestParams.Add("TileRow", tileRow.ToString());
string requestString = this.Service.BaseUri;
for (int i = 0; i < requestParams.Count; i++)
{
if (i == 0)
{
requestString += "?";
}
requestString += requestParams.ElementAt(i).Key;
requestString += "=";
requestString += requestParams.ElementAt(i).Value;
if (i != requestParams.Count - 1)
{
requestString += "&";
}
}
CancellationTokenSource source = new CancellationTokenSource();
CancellationToken token = source.Token;
Task<HttpResponseMessage> response = HttppClient.GetAsync(requestString, token);
return await response;
}
我目前面临两个问题,为此我尝试了很多方法:
- 在当前设置中,在我的
ContinueWith
任务中,我遇到了一些奇怪的错误,告诉我“对象引用未设置为对象的实例”。即使ContinueWith
任务中的response
变量和imageTile
变量不为空? - 另一个问题是我仍然收到 TaskCancellationExceptions。但如果我是正确的,这些异常应该被 Continuation 任务捕获?
有人可以为我指明解决这个问题的正确方向吗?或者 Tasks 是否可行?
是的,任务是要走的路,但不是,ContinueWith
is not the way to go. This method is mostly a relic from the pre async-await era, and it's rarely useful nowadays. The same is true for the Task.Factory.StartNew
: you rarely need to use this method after the introduction of the Task.Run
方法。
创建下载切片数据所需的任务的一种简便方法是 LINQ Select
运算符。你可以这样使用它:
async Task<Dictionary<(int, int), Image>> GetAllTileDataAsync(List<(int, int)> tiles)
{
Task<(int, int, Image)>[] tasks = tiles.Select(async tile =>
{
(int tileColumn, int tileRow) = tile;
int retry = 0;
while (true)
{
try
{
using HttpResponseMessage response = await GetTileDataAsync(
tileColumn, tileRow);
response.EnsureSuccessStatusCode();
byte[] bytes = await response.Content.ReadAsByteArrayAsync();
Image image = Image.FromStream(new MemoryStream(bytes));
return (tileColumn, tileRow, image);
}
catch
{
if (retry >= 3) throw;
}
await Task.Delay(1000);
retry++;
}
}).ToArray();
(int, int, Image)[] results = await Task.WhenAll(tasks);
return results.ToDictionary(e => (e.Item1, e.Item2), e => e.Item3);
}
每个图块都投影到 Task<(int, int, Image)>
。任务的结果包含有关图块的所有初始信息和获取信息。这样就无需依赖危险的副作用来构建最终的 Dictionary
.
注意上面代码中没有任何 Task.Factory.StartNew
、.ContinueWith
、.Result
、.Wait()
和 Task.WaitAll
。所有这些方法在支持异步的现代应用程序中都是危险信号。一切都由 async/await 组成。 No threads are created, no threads are blocked,并且您的应用程序达到了最大的可扩展性和响应能力。