如何在 C# 中对 websocket 数据快速处理和执行操作?

How to handle and perform actions fast on websocket data in c#?

我正在使用 websocket 连接到第三方数据馈送提供商的服务器。 对于 websocket 连接,我的代码是:

this.websocket = new WebSocket("wss://socket.polygon.io/stocks", sslProtocols: SslProtocols.Tls12 | SslProtocols.Tls11 | SslProtocols.Tls);

因此,当建立连接后,我们每分钟收到近 70,000 到 1,00,000 条记录。所以在那之后我们将这些响应分成两部分并将其存储在它的单独文件中。就像如果我们收到 AAPL 的数据,然后我们将该数据存储到 AAPL 的文件中。与 FB、MSFT、IBM、QQQ 等相同。我们总共有 10,000 个文件,我们需要一次处理并根据它存储实时记录。

public static string tempFile = @"D:\TempFileForLiveMarket\tempFileStoreLiveSymbols.txt";
public static System.IO.StreamWriter w;
private void websocket_MessageReceived(object sender, MessageReceivedEventArgs e)
{
  using (w = System.IO.File.AppendText(tempFile))
  {
     Log(e.Message, w);
  }
  using (System.IO.StreamReader r = System.IO.File.OpenText(tempFile))
  {
     DumpLog(r);
  }
}

public static void Log(string responseMessage, System.IO.TextWriter w)
{
     w.WriteLine(responseMessage);
}

public static void DumpLog(System.IO.StreamReader r)
{
  string line;
  while ((line = r.ReadLine()) != null)
  {
     WriteRecord(line);
  }
}

public static void WriteRecord(string data)
{
   List<LiveData> ld = JsonConvert.DeserializeObject<List<LiveData>>(data);
   var filterData = ld.Where(x => symbolList.Contains(x.sym));
   List<string> fileLines = new List<string>();
   foreach (var item in filterData)
   {
      var fileName = @"D:\SymbolsData\"+item.sym+ "_day_Aggregate.txt";
      fileLines = File.ReadAllLines(fileName).AsParallel().Skip(1).ToList();
      if (fileLines.Count > 1)
      {
         var lastLine = fileLines.Last();
         if (!lastLine.Contains(item.sym))
         {
               fileLines.RemoveAt(fileLines.Count - 1);
         }
      }
      fileLines.Add(item.sym + "," + item.s + "," + item.p + "-----");
      System.IO.File.WriteAllLines(fileName, fileLines);
   }
}

因此,当建立 websocket 连接并使用我们的 10,000 个文件对实时市场数据执行操作时,速度会变慢,并且 websocket 连接会在几分钟后关闭并传递如下消息:

Websocket Error
Received an unexpected EOF or 0 bytes from the transport stream.
Connection Closed...

我正在执行整个过程,因为在下一阶段我需要对每个符号的实时价格执行技术分析。那么我该如何处理这种情况呢?我怎样才能使处理速度比这个处理速度快?以及如何停止连接关闭?

编辑后

我用 String Builder 替换流编写器和临时文件,如下所示,

public static StringBuilder sb = new StringBuilder();
public static System.IO.StringWriter sw;
private void websocket_MessageReceived(object sender, MessageReceivedEventArgs e)
{
      sw = new System.IO.StringWriter(sb);
      sw.WriteLine(e.Message);
      Reader();
}

public static void Reader()
{
     System.IO.StringReader _sr = new System.IO.StringReader(sb.ToString());
     while (_sr.Peek() > -1)
     {
            WriteRecord(sb.ToString());
     }
     sb.Remove(0, sb.Length);
}

public static void WriteRecord(string data)
{
     List<LiveData> ld = JsonConvert.DeserializeObject<List<LiveData>>(data);
     foreach (var item in filterData)
     {
           var fileName = @"D:\SymbolsData\"+item.sym+ "_day_Aggregate.txt";
           fileLines = File.ReadAllLines(fileName).AsParallel().Skip(1).ToList();
           fileLines.RemoveAt(fileLines.Count - 1);
           fileLines.Add(item.sym + "," + item.s + "," + item.p)
           System.IO.File.WriteAllLines(fileName, fileLines);
      }
}

看起来你 append 每条消息到 tempFile,但是你处理 entire tempFile.这意味着你不断地重新处理 old 数据 plus 新记录,所以是的:它会逐渐花费越来越长的时间,直到它花了很长时间,以至于另一端厌倦了等待,并打断了你。我的建议:不要那样做。

还有 很多 你可以在实际处理每条记录时更有效地做一些事情,但与不断重新处理的开销相比,这是无关紧要的 一切.