用于向 Web 服务发出 http post 请求的多线程

Multithreading for making http post requests to web service

我想向 C# 中的 Web 服务发送多个 HTTP post 请求。例如,如果 n=3,则应发出来自 3 个 xml 文件的 http post 请求并且响应应该写在 file.Once 中,前 3 个请求被发出,然后接下来的 3 个请求将被发出。 所以我编写了以下代码,但一开始我得到的是随机输出。但是现在我在内部 for 循环或内部服务器错误 (500) 中遇到超出索引范围的异常。请建议适当的更改。我正在使用 .NET4.0

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;
using System.Threading;
using System.Xml;
using System.Net;
using System.Threading.Tasks;
namespace ConsoleApplication5
{
class Program
{
    static void Main(string[] args)
    {
        int n = 0;
        Console.WriteLine("Enter the number");
        string s = Console.ReadLine();
        int.TryParse(s, out n);
        string path = "C:\";
        string[] files = null;
        files = Directory.GetFiles(path, "*.xml", SearchOption.TopDirectoryOnly);


        List<Task> tasks = new List<Task>(files.Length);

        for (int i = 0; i < files.Length; i += n)
        {
            for (int j = 0; j < n; j++)
            {
                int x = i + j;

                if (x < files.Length && files[x] != null)
                {
                    Task t = new Task(() => function(files[x]));
                    t.Start();
                    tasks.Add(t);
                }
            }

            if (tasks.Count > 0)
            {
                Task.WaitAll(tasks.ToArray(), Timeout.Infinite); // or less than infinite
                tasks.Clear();
            }
        }
    }
    public static void function(string temp)
    {
        XmlDocument doc = new XmlDocument();
        doc.Load(temp);
        HttpWebRequest request = (HttpWebRequest)WebRequest.Create("http://10.76.22.135/wpaADws/ADService.asmx");

        request.ContentType = "text/xml;charset=\"utf-8\"";
        request.Accept = "text/xml";
        request.Method = "POST";
        Stream stream = request.GetRequestStream();
        doc.Save(stream);
        stream.Close();
        HttpWebResponse response = (HttpWebResponse)request.GetResponse();
        using (StreamReader rd = new StreamReader(response.GetResponseStream()))
        {
            string soapResult = rd.ReadToEnd();
            doc.LoadXml(soapResult);
            File.WriteAllText(temp, doc.DocumentElement.InnerText);

            //XmlTextWriter xml=new XmlTextWriter(
            Console.WriteLine(soapResult);
            Console.ReadKey();
        }

    }

}

}

如何使用这样的任务:

    List<Task> tasks = new List<Task>(n);

    for (int i = 0; i < files.Length; i += n)
    {
        for (int j = 0; j < n; j++)
        {
            int x = i + j;

            if (x < files.Length && files[x] != null)
            {
                Task t = new Task(() => function(files[x]));
                t.Start();
                tasks.Add(t);
            }
        }

        if (tasks.Count > 0)
        {
            Task.WaitAll(tasks.ToArray(), Timeout.Infinite); // or less than infinite
            tasks.Clear();
        }
    }

我试图在索引上更整洁一些...

此外,请注意,由于 C# 捕获 lambda 变量的方式,内循环中的 int x = i + j; 很重要。

如果问题是追踪索引算法,是否可以使用具有有意义名称的索引变量?

    List<Task> tasks = new List<Task>(taskCount);

    for (int filesIdx = 0; filesIdx < files.Length; filesIdx += taskCount)
    {
        for (int tasksIdx = 0; tasksIdx < taskCount; tasksIdx++)
        {
            int index = filesIdx + tasksIdx;

            if (index < files.Length && files[index] != null)
            {
                Task task = new Task(() => function(files[index]));
                task.Start();
                tasks.Add(task);
            }
        }

        if (tasks.Count > 0)
        {
            Task.WaitAll(tasks.ToArray(), Timeout.Infinite); // or less than infinite
            tasks.Clear();
        }
    }

您在原始 post 中遇到的 IndexOutOfRangeException 是由于对您正在处理的最后一批文件的索引处理不当造成的。最后一批可能不完整,您将其视为固定大小

的常规批次

(n=3 in your post)

由于您要转到 TPL 并且 Tasks,我建议 Parallel Programming with Microsoft .NET, and the pipeline pattern which seems very appropriate to your scenario. You can harness the power of concurrent collections and the producer/consumer pattern together with the pipeline, like below. BlockingCollection ensures concurrent adding of items and the BlockingCollection.GetConsumingEnumerable 调用为您的集合生成一个消耗性阻塞枚举器。

const int BUFFER_SIZE = 3; // no concurrent items to process
const string XML_FOLDER_PATH = "<whatever>";


public static void Pipeline()
{
  var bufferXmlFileNames = new BlockingCollection<string>(BUFFER_SIZE);
  var bufferInputXmlDocuments = new BlockingCollection<XmlDocument>(BUFFER_SIZE);
  var bufferWebRequests = new BlockingCollection<HttpWebRequest>(BUFFER_SIZE);
  var bufferSoapResults = new BlockingCollection<string>(BUFFER_SIZE);

  var f = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);

  // Stage 1: get xml file paths
  var stage1 = f.StartNew(() => {
  try
  {
    foreach (var phrase in Directory.GetFiles(XML_FOLDER_PATH, "*.xml", SearchOption.TopDirectoryOnly))
    { // build concurrent collection
      bufferXmlFileNames.Add(phrase);
    }
  }
  finally
  { // no more additions acceptedin
    bufferXmlFileNames.CompleteAdding();
  }
});

  // Stage 2: ProduceInputXmlDocuments(bufferXmlFileNames, bufferInputXmlDocuments)
  var stage2 = f.StartNew(() =>  {
  try
  {
    foreach (var xmlFileName in bufferXmlFileNames.GetConsumingEnumerable())
    {
      XmlDocument doc = new XmlDocument();
      doc.Load(xmlFileName);
      bufferInputXmlDocuments.Add(doc);          
    }
  }
  finally
  {
    bufferInputXmlDocuments.CompleteAdding();
  }
});

  // Stage 3:  PostRequests(BlockingCollection<XmlDocument> xmlDocs, BlockingCollection<HttpWebRequest> posts)
  var stage3 = f.StartNew(() =>  {
  try
  {
    foreach (var xmlDoc in bufferInputXmlDocuments.GetConsumingEnumerable())
    {
      HttpWebRequest request = (HttpWebRequest)WebRequest.Create("http://10.76.22.135/wpaADws/ADService.asmx");
      request.ContentType = "text/xml;charset=\"utf-8\"";
      request.Accept = "text/xml";
      request.Method = "POST";
      //
      Stream stream = request.GetRequestStream();
      xmlDoc.Save(stream);
      stream.Close();
      //
      bufferWebRequests.Add(request);
    }
  }
  finally
  {
    bufferWebRequests.CompleteAdding();
  }
});

  // Stage 4: ProcessResponses(bufferWebRequests, bufferSoapResults)
  var stage4 = f.StartNew(() =>
  {
    try
    {
      foreach (var postRequest in bufferWebRequests.GetConsumingEnumerable())
      {
        HttpWebResponse response = (HttpWebResponse)postRequest.GetResponse();
        using (StreamReader rd = new StreamReader(response.GetResponseStream()))
        {
          string soapResult = rd.ReadToEnd();
          bufferSoapResults.Add(soapResult);
        }
      }
    }
    finally
    {
      bufferSoapResults.CompleteAdding();
    }
  });

  // stage 5: update UI
  var stage5 = f.StartNew(() =>
  {
    foreach (var soapResult in bufferSoapResults.GetConsumingEnumerable())
    {
      Console.WriteLine(soapResult);
    }
  });

  // display blocking collection load state, 
  // the number of elements in each blocking collection of the pipeline stages
  // you can supress this call completely, because it is informational only
  var stageDisplay = f.StartNew(
    () =>
    {
      while (true)
      {
        Console.WriteLine("{0,10} {1,10} {2,10} {3,10}", bufferXmlFileNames.Count, bufferInputXmlDocuments.Count, bufferWebRequests.Count, bufferSoapResults.Count);
        //check last stage completion
        if (stage5.IsCompleted)
          return;
      }
    }
      );
  Task.WaitAll(stage1, stage2, stage3, stage4, stage5); //or
  //Task.WaitAll(stage1, stage2, stage3, stage4, stage5, stageDisplay);
}

此代码有效。 解释:

  • 首先,用户给出 .xml 文件的源路径和目标路径。
  • Directory.getFiles() 帮助我们获取字符串数组中的 .xml 文件。 (我们必须传递 .xml 作为参数) .

  • 所以现在基本上发生的是,对于我们在源 pat 获得的每个文件,都会创建一个线程。

  • 但是如果用户想一次发送"n"个请求,那么一次创建n个线程。
  • 并且除非前一个线程执行完毕,否则不会创建下一组线程。
  • 这是由 thread.Join() 确保的。
  • 并且在向网络服务发出请求后,我们通过 getResponse() 获得响应并将响应写入存储在目标路径中的 .xml 文件中。

     using System;
     using System.Collections.Generic;
     using System.Linq;
     using System.Text;
     using System.IO;
     using System.Threading;
     using System.Xml;
     using System.Net;
     namespace ConsoleApplication4
     {
         class Program
         {
          int flag = 1;
          string destination;
          string source;
          static void Main(string[] args)
        {
        Console.ForegroundColor = ConsoleColor.Red;
    
        Console.WriteLine("**************************** Send HTTP Post Requests **************************");
        int n = 0;
        Program p = new Program();
        Console.WriteLine("Enter the number of requests you want to send at a time");
        string s = Console.ReadLine();
        int.TryParse(s, out n);
        Console.WriteLine("Enter Source");
        p.source = Console.ReadLine();
        Console.WriteLine("Enter Destination");
        p.destination = Console.ReadLine();
    
        string[] files = null;
        files = Directory.GetFiles(p.source, "*.xml", SearchOption.TopDirectoryOnly);
    
        Thread[] thread = new Thread[files.Length];
    
        int len = files.Length;
        for (int i = 0; i<len; i+=n)
        {
            int x = i;
            //Thread.Sleep(5000);
            for (int j = 0; j < n && x < len; j++)
            {
    
                var localx = x;
                thread[x] = new Thread(() => function(files[localx], p));
                thread[x].Start();
                Thread.Sleep(50);
                //thread[x].Join();
                x++;
            }
            int y = x - n;
            for (; y < x; y++)
            {
                int t = y;
                thread[t].Join();
    
            }
    
        }
    
        // thread[0] = new Thread(() => function(files[0]));
        //thread[0].Start();
        Console.ReadKey();
    
    }
    public static void function(string temp,Program p)
    {
    
        XmlDocument doc = new XmlDocument();
        doc.Load(temp);
    
        string final_d=p.destination + "response " + p.flag + ".xml";
        p.flag++;
        HttpWebRequest request = (HttpWebRequest)WebRequest.Create("http://10.76.22.135/wpaADws/ADService.asmx");
        request.ContentType = "text/xml;charset=\"utf-8\"";
        request.Accept = "text/xml";
        request.Method = "POST";
        Stream stream = request.GetRequestStream();
        doc.Save(stream);
        stream.Close();
    
        HttpWebResponse response = (HttpWebResponse)request.GetResponse();
        using (StreamReader rd = new StreamReader(response.GetResponseStream()))
        {
            string soapResult = rd.ReadToEnd();
            doc.LoadXml(soapResult);
            File.WriteAllText(final_d, doc.DocumentElement.InnerText);
    
            //XmlTextWriter xml=new XmlTextWriter(
            Console.WriteLine(soapResult);
            //Console.ReadKey();
        }
    }
    

    } }