C# Web MVC 5项目中连续阅读SslStream

Read SslStream continuously in C# Web MVC 5 project

tl;dr: "I would like to read an SslStream continuously in C# but I can't figure out the best way."

一些背景:我正在从一个流中获取股票数据,我想在 Web 界面中显示这些数据。流每 5 秒发送一次心跳,您还可以 subscribe/unsubscribe 股票价格和新闻等

https://api.test.nordnet.se/next/2/api-docs/docs/feeds

目前我正在使用 MSDN 示例中的 SslTcpClient 来读取和写入流,它工作正常。

https://msdn.microsoft.com/en-us/library/system.net.security.sslstream.aspx

我的问题是流不断发送数据,我不确定如何使用它并以最佳方式呈现它。

我现在的解决方案是每五秒对 "clear" 流进行一次 ajax 调用并获取新数据。这感觉不像是一个可靠的解决方案,我经常收到错误 "The Read method cannot be called when another read operation is pending."

更新:

@phuzi:下面 ajax 和 SignalR 调用之间的区别。不幸的是,这并不能帮助我连续读取流,因为我仍然需要每五秒钟调用一次该方法。一个解决方案可能是 Hangfire 与 SignalR 结合使用,尽管我不知道如何最好地实现它。

$.connection.hub.start().done(function () {
    setInterval(function () {
        stockHub.server.getHeartBeat();
    }, 5000);
});

setInterval(function () {
    $.ajax({
        url: '@Url.Action("getHeartBeat")',
        type: 'POST',
        data: {
        },
        success: function (message) {
            console.log(message);
        },
        error: function () {
        }
    );
}, 5000);

@usr:这是一些示例数据:

我使用此字符串订阅特定市场的股票深度

"{"cmd":"subscribe", "args":{"t":"depth", "i":"5110", "m":11}}\n"

然后我读取提要并向我的读取方法添加一个预期值,响应应包含我要查找的字符串,例如:

response.Contains("{"type":"depth","data":{"i":"5110","m":11")

然后我在 return 中得到一个看起来像这样的字符串:

"{"type":"depth","data":{"i":"5110","m":11,"tick_timestamp":1439894747189,"bid1":1.02,"bid_volume1":734827,"bid_orders1":30,"ask1":1.03,"ask_volume1":393598,"ask_orders1":8,"bid2":1.01,"bid_volume2":805705,"bid_orders2":35,"ask2":1.04,"ask_volume2":404815,"ask_orders2":15,"bid3":1.00,"bid_volume3":1387177,"bid_orders3":62,"ask3":1.05,"ask_volume3":601579,"ask_orders3":29,"bid4":0.995,"bid_volume4":123610,"bid_orders4":9,"ask4":1.06,"ask_volume4":313060,"ask_orders4":15,"bid5":0.990,"bid_volume5":386543,"bid_orders5":31,"ask5":1.07,"ask_volume5":741100,"ask_orders5":11}}"

使用锁解决了错误 "The Read method cannot be called when another read operation is pending."。

private static readonly object _readBytesLock = new object();

private static volatile bool _readingBytes = false;

public static string ReadMessage(SslStream sslStream, string expectedValue = "heartbeat")
{
    // Read the  message sent by the server. 
    // The end of the message is signaled using the 
    // "<EOF>" marker.
    string message;
    byte[] buffer = new byte[2048];
    StringBuilder messageData = new StringBuilder();
    int bytes = -1;
    do
    {
        //bytes = sslStream.Read(buffer, 0, buffer.Length);
        bytes = GetBytes(sslStream, buffer);
        // Use Decoder class to convert from bytes to UTF8 
        // in case a character spans two buffers.
        Decoder decoder = Encoding.UTF8.GetDecoder();
        char[] chars = new char[decoder.GetCharCount(buffer, 0, bytes)];
        decoder.GetChars(buffer, 0, bytes, chars, 0);
        message = messageData.Append(chars).ToString();
        Debug.WriteLine(messageData);
        if (message.Contains(expectedValue))
        {
            return message;
        }
        if (Regex.Matches(messageData.ToString(), "heartbeat").Count >= 3)
        {
            Debug.WriteLine("Something went wrong, 3 heartbeats received and nothing with the expected value: " + expectedValue);
            return message;
        }
    } while (bytes != 0);

    return message;
}

public static int GetBytes(SslStream sslStream, byte[] buffer)
{
    try
    {
        if (buffer == null)
            return -1;
        var bytes = -1;
        lock (_readBytesLock)
        {
            if (!_readingBytes)
            {
                _readingBytes = true;
                bytes = sslStream.Read(buffer, 0, buffer.Length);
            }
            _readingBytes = false;
            return bytes;
        }
    }
    catch (Exception ex)
    {
        var methodName = System.Reflection.MethodBase.GetCurrentMethod().Name;
        Debug.WriteLine("Method " + methodName + " failed " + ex.Message);
        return 0;
        //throw;
    }
}

运行良好的最终代码。非常感谢@usr 和其他所有人的帮助。

private Task _feedTask;
private SslStream _sslStream;
private StreamReader _streamReader;
private static readonly log4net.ILog _logger
        = log4net.LogManager.GetLogger(
                System.Reflection.MethodBase.GetCurrentMethod()
                 .DeclaringType);

public void CreateNewTaskAndStartReadingFeed()
{
    _feedTask = new Task(() => StartReadingFeed());
    _feedTask.Start();
}

public void StartReadingFeed()
{
   try
   {
       while (true)
       {
           var message = GetStreamMessage();

           if (message != null)
           {
               Debug.WriteLine("StartReadingFeed message: " + message);
               OnReceivedSomething(message);
           }
           else
           {
               Debug.WriteLine("StartReadingFeed message was null");
           }
       }
   }
   catch (Exception ex)
   {
       var methodName = System.Reflection.MethodBase.GetCurrentMethod().Name;
       Debug.WriteLine($"Method {methodName} failed " + ex.Message);
       Debug.WriteLine("Creating a new task and starts to reed feed again");
       _logger.Error("StartReadingFeed failed", ex);
       CreateNewTaskAndStartReadingFeed();
   }
}

public string GetStreamMessage()
{
    try
    {
        return _sslStream.CanRead ? _streamReader.ReadLine() : null;
    }
    catch (Exception ex)
    {
        var methodName = System.Reflection.MethodBase.GetCurrentMethod().Name;
        Debug.WriteLine($"Method {methodName} failed " + ex.Message);
        _logger.Error($"{methodName} failed.", ex);
        return null;
    }
}