Parellel.ForEach 在对象的方法之前返回,这使得调用速率受到限制 api

Parellel.ForEach returning before object's method which makes rate limited api calls

我正在为一个需要进行 api 调用的程序开发一个插件,我之前是同步进行所有调用的,虽然效果很好,但速度很慢。

为了解决这个问题,我尝试使调用异步,我每秒可以调用 10 个,所以我尝试了以下操作:

            Parallel.ForEach(
                             items.Values,
                             new ParallelOptions { MaxDegreeOfParallelism = 10 },
                             async item => {
                             await item.UpdateMarketData(client, HQOnly.Checked, retainers);
                             await Task.Delay(1000);
                             }
                         );

client 是一个 HttpClient 对象,其余部分用于构建 api 调用或对 api 调用结果所做的工作。每次调用 item.UpdateMarketData() 1 次,并且只调用 1 次 api。

这段代码似乎很快就完成了,据我所知,程序应该等待 Parallel.ForEach() 完成后再继续。

item.UpdateMarketData()应该设置的数据也没有设置。为了确保,我什至将 MaxDegreeOfParallelism = 1 和延迟设置为 3 秒,尽管有大约 44 个项目要经过,它仍然很快完成。任何帮助将不胜感激。

UpdateMarketData() 包含在下面以防相关

public async Task UpdateMarketData(TextBox DebugTextBox,HttpClient client, bool HQOnly, List<string> retainers)
    {
        HttpResponseMessage sellers_result = null;
        try
        {
            sellers_result = await client.GetAsync(String.Format("www.apiImCalling/items/{0}?key=secretapikey", ID));

        }
        catch (Exception e)
        {
            System.Windows.Forms.MessageBox.Show(String.Format("{0} Exception caught.", e));
            sellers_result = null;
        }
        var results = JsonConvert.DeserializeObject<RootObjectMB>(sellers_result.Content.ReadAsStringAsync().Result);
        int count = 0;
        OnMB = false;
        LowestOnMB = false;
        LowestPrice = int.MaxValue;
        try
        {
            foreach (var x in results.Prices)
            {

                if (x.IsHQ | !(HQOnly && RequireHQ))
                {
                    count++;
                    if (count == 1)
                    {
                        LowestPrice = x.PricePerUnit;
                    }
                    if (retainers.Contains(x.RetainerName))
                    {
                        Retainer = x.RetainerName;
                        OnMB = true;
                        Price = x.PricePerUnit;
                        if (count == 1)
                        {
                            LowestOnMB = true;
                        }
                    }
                    if (LowestPrice == x.PricePerUnit && x.RetainerName != Retainer)
                    {
                        LowestOnMB = false;
                    }

                }

            }

        }
        catch (Exception e)
        {
            System.Windows.Forms.MessageBox.Show(String.Format("{0} Exception caught.", e));

        }
    }

而不是 parallel.for 循环,您可以使用 Task 并等待所有任务完成。

var tasks = new List<Task>();
foreach (var val in items.Values)
    tasks.Add(Task.Factory.StartNew(val.UpdateMarketData(client, HQOnly.Checked, retainers)));

 try
 {
    // Wait for all the tasks to finish.
    Task.WaitAll(tasks.ToArray());
    //make use of WhenAll method if you dont want to block thread, and want to use async/await 

    Console.WriteLine("update completed");
  }
  catch (AggregateException e)
  {
     Console.WriteLine("\nThe following exceptions have been thrown by WaitAll(): (THIS WAS EXPECTED)");
    for (int j = 0; j < e.InnerExceptions.Count; j++)
    {
        Console.WriteLine("\n-------------------------------------------------\n{0}", e.InnerExceptions[j].ToString());
    }
  }

async 不适用于 Parallel。一个是异步,一个是并行,这是两种完全不同的并发方式。

要限制异步操作​​的并发,使用SemaphoreSlim。例如:

var mutex = new SemaphoreSlim(10);
var tasks = items.Values.Select(item => DoUpdateMarketData(item)).ToList();
await Task.WhenAll(tasks);

async Task DoUpdateMarketData(Item item)
{
  await mutex.WaitAsync();
  try
  {
    await item.UpdateMarketData(client, HQOnly.Checked, retainers);
    await Task.Delay(1000);
  }
  finally { mutex.Release(); }
}

您可能会发现 my book 有帮助;这在食谱 11.5 中有所介绍。