C# Web MVC 5项目中连续阅读SslStream
Read SslStream continuously in C# Web MVC 5 project
tl;dr: "I would like to read an SslStream continuously in C# but I can't figure out the best way."
一些背景:我正在从一个流中获取股票数据,我想在 Web 界面中显示这些数据。流每 5 秒发送一次心跳,您还可以 subscribe/unsubscribe 股票价格和新闻等
https://api.test.nordnet.se/next/2/api-docs/docs/feeds
目前我正在使用 MSDN 示例中的 SslTcpClient 来读取和写入流,它工作正常。
https://msdn.microsoft.com/en-us/library/system.net.security.sslstream.aspx
我的问题是流不断发送数据,我不确定如何使用它并以最佳方式呈现它。
我现在的解决方案是每五秒对 "clear" 流进行一次 ajax 调用并获取新数据。这感觉不像是一个可靠的解决方案,我经常收到错误 "The Read method cannot be called when another read operation is pending."。
更新:
@phuzi:下面 ajax 和 SignalR 调用之间的区别。不幸的是,这并不能帮助我连续读取流,因为我仍然需要每五秒钟调用一次该方法。一个解决方案可能是 Hangfire 与 SignalR 结合使用,尽管我不知道如何最好地实现它。
$.connection.hub.start().done(function () {
setInterval(function () {
stockHub.server.getHeartBeat();
}, 5000);
});
setInterval(function () {
$.ajax({
url: '@Url.Action("getHeartBeat")',
type: 'POST',
data: {
},
success: function (message) {
console.log(message);
},
error: function () {
}
);
}, 5000);
@usr:这是一些示例数据:
我使用此字符串订阅特定市场的股票深度
"{"cmd":"subscribe", "args":{"t":"depth", "i":"5110", "m":11}}\n"
然后我读取提要并向我的读取方法添加一个预期值,响应应包含我要查找的字符串,例如:
response.Contains("{"type":"depth","data":{"i":"5110","m":11")
然后我在 return 中得到一个看起来像这样的字符串:
"{"type":"depth","data":{"i":"5110","m":11,"tick_timestamp":1439894747189,"bid1":1.02,"bid_volume1":734827,"bid_orders1":30,"ask1":1.03,"ask_volume1":393598,"ask_orders1":8,"bid2":1.01,"bid_volume2":805705,"bid_orders2":35,"ask2":1.04,"ask_volume2":404815,"ask_orders2":15,"bid3":1.00,"bid_volume3":1387177,"bid_orders3":62,"ask3":1.05,"ask_volume3":601579,"ask_orders3":29,"bid4":0.995,"bid_volume4":123610,"bid_orders4":9,"ask4":1.06,"ask_volume4":313060,"ask_orders4":15,"bid5":0.990,"bid_volume5":386543,"bid_orders5":31,"ask5":1.07,"ask_volume5":741100,"ask_orders5":11}}"
使用锁解决了错误 "The Read method cannot be called when another read operation is pending."。
private static readonly object _readBytesLock = new object();
private static volatile bool _readingBytes = false;
public static string ReadMessage(SslStream sslStream, string expectedValue = "heartbeat")
{
// Read the message sent by the server.
// The end of the message is signaled using the
// "<EOF>" marker.
string message;
byte[] buffer = new byte[2048];
StringBuilder messageData = new StringBuilder();
int bytes = -1;
do
{
//bytes = sslStream.Read(buffer, 0, buffer.Length);
bytes = GetBytes(sslStream, buffer);
// Use Decoder class to convert from bytes to UTF8
// in case a character spans two buffers.
Decoder decoder = Encoding.UTF8.GetDecoder();
char[] chars = new char[decoder.GetCharCount(buffer, 0, bytes)];
decoder.GetChars(buffer, 0, bytes, chars, 0);
message = messageData.Append(chars).ToString();
Debug.WriteLine(messageData);
if (message.Contains(expectedValue))
{
return message;
}
if (Regex.Matches(messageData.ToString(), "heartbeat").Count >= 3)
{
Debug.WriteLine("Something went wrong, 3 heartbeats received and nothing with the expected value: " + expectedValue);
return message;
}
} while (bytes != 0);
return message;
}
public static int GetBytes(SslStream sslStream, byte[] buffer)
{
try
{
if (buffer == null)
return -1;
var bytes = -1;
lock (_readBytesLock)
{
if (!_readingBytes)
{
_readingBytes = true;
bytes = sslStream.Read(buffer, 0, buffer.Length);
}
_readingBytes = false;
return bytes;
}
}
catch (Exception ex)
{
var methodName = System.Reflection.MethodBase.GetCurrentMethod().Name;
Debug.WriteLine("Method " + methodName + " failed " + ex.Message);
return 0;
//throw;
}
}
运行良好的最终代码。非常感谢@usr 和其他所有人的帮助。
private Task _feedTask;
private SslStream _sslStream;
private StreamReader _streamReader;
private static readonly log4net.ILog _logger
= log4net.LogManager.GetLogger(
System.Reflection.MethodBase.GetCurrentMethod()
.DeclaringType);
public void CreateNewTaskAndStartReadingFeed()
{
_feedTask = new Task(() => StartReadingFeed());
_feedTask.Start();
}
public void StartReadingFeed()
{
try
{
while (true)
{
var message = GetStreamMessage();
if (message != null)
{
Debug.WriteLine("StartReadingFeed message: " + message);
OnReceivedSomething(message);
}
else
{
Debug.WriteLine("StartReadingFeed message was null");
}
}
}
catch (Exception ex)
{
var methodName = System.Reflection.MethodBase.GetCurrentMethod().Name;
Debug.WriteLine($"Method {methodName} failed " + ex.Message);
Debug.WriteLine("Creating a new task and starts to reed feed again");
_logger.Error("StartReadingFeed failed", ex);
CreateNewTaskAndStartReadingFeed();
}
}
public string GetStreamMessage()
{
try
{
return _sslStream.CanRead ? _streamReader.ReadLine() : null;
}
catch (Exception ex)
{
var methodName = System.Reflection.MethodBase.GetCurrentMethod().Name;
Debug.WriteLine($"Method {methodName} failed " + ex.Message);
_logger.Error($"{methodName} failed.", ex);
return null;
}
}
tl;dr: "I would like to read an SslStream continuously in C# but I can't figure out the best way."
一些背景:我正在从一个流中获取股票数据,我想在 Web 界面中显示这些数据。流每 5 秒发送一次心跳,您还可以 subscribe/unsubscribe 股票价格和新闻等
https://api.test.nordnet.se/next/2/api-docs/docs/feeds
目前我正在使用 MSDN 示例中的 SslTcpClient 来读取和写入流,它工作正常。
https://msdn.microsoft.com/en-us/library/system.net.security.sslstream.aspx
我的问题是流不断发送数据,我不确定如何使用它并以最佳方式呈现它。
我现在的解决方案是每五秒对 "clear" 流进行一次 ajax 调用并获取新数据。这感觉不像是一个可靠的解决方案,我经常收到错误 "The Read method cannot be called when another read operation is pending."。
更新:
@phuzi:下面 ajax 和 SignalR 调用之间的区别。不幸的是,这并不能帮助我连续读取流,因为我仍然需要每五秒钟调用一次该方法。一个解决方案可能是 Hangfire 与 SignalR 结合使用,尽管我不知道如何最好地实现它。
$.connection.hub.start().done(function () {
setInterval(function () {
stockHub.server.getHeartBeat();
}, 5000);
});
setInterval(function () {
$.ajax({
url: '@Url.Action("getHeartBeat")',
type: 'POST',
data: {
},
success: function (message) {
console.log(message);
},
error: function () {
}
);
}, 5000);
@usr:这是一些示例数据:
我使用此字符串订阅特定市场的股票深度
"{"cmd":"subscribe", "args":{"t":"depth", "i":"5110", "m":11}}\n"
然后我读取提要并向我的读取方法添加一个预期值,响应应包含我要查找的字符串,例如:
response.Contains("{"type":"depth","data":{"i":"5110","m":11")
然后我在 return 中得到一个看起来像这样的字符串:
"{"type":"depth","data":{"i":"5110","m":11,"tick_timestamp":1439894747189,"bid1":1.02,"bid_volume1":734827,"bid_orders1":30,"ask1":1.03,"ask_volume1":393598,"ask_orders1":8,"bid2":1.01,"bid_volume2":805705,"bid_orders2":35,"ask2":1.04,"ask_volume2":404815,"ask_orders2":15,"bid3":1.00,"bid_volume3":1387177,"bid_orders3":62,"ask3":1.05,"ask_volume3":601579,"ask_orders3":29,"bid4":0.995,"bid_volume4":123610,"bid_orders4":9,"ask4":1.06,"ask_volume4":313060,"ask_orders4":15,"bid5":0.990,"bid_volume5":386543,"bid_orders5":31,"ask5":1.07,"ask_volume5":741100,"ask_orders5":11}}"
使用锁解决了错误 "The Read method cannot be called when another read operation is pending."。
private static readonly object _readBytesLock = new object();
private static volatile bool _readingBytes = false;
public static string ReadMessage(SslStream sslStream, string expectedValue = "heartbeat")
{
// Read the message sent by the server.
// The end of the message is signaled using the
// "<EOF>" marker.
string message;
byte[] buffer = new byte[2048];
StringBuilder messageData = new StringBuilder();
int bytes = -1;
do
{
//bytes = sslStream.Read(buffer, 0, buffer.Length);
bytes = GetBytes(sslStream, buffer);
// Use Decoder class to convert from bytes to UTF8
// in case a character spans two buffers.
Decoder decoder = Encoding.UTF8.GetDecoder();
char[] chars = new char[decoder.GetCharCount(buffer, 0, bytes)];
decoder.GetChars(buffer, 0, bytes, chars, 0);
message = messageData.Append(chars).ToString();
Debug.WriteLine(messageData);
if (message.Contains(expectedValue))
{
return message;
}
if (Regex.Matches(messageData.ToString(), "heartbeat").Count >= 3)
{
Debug.WriteLine("Something went wrong, 3 heartbeats received and nothing with the expected value: " + expectedValue);
return message;
}
} while (bytes != 0);
return message;
}
public static int GetBytes(SslStream sslStream, byte[] buffer)
{
try
{
if (buffer == null)
return -1;
var bytes = -1;
lock (_readBytesLock)
{
if (!_readingBytes)
{
_readingBytes = true;
bytes = sslStream.Read(buffer, 0, buffer.Length);
}
_readingBytes = false;
return bytes;
}
}
catch (Exception ex)
{
var methodName = System.Reflection.MethodBase.GetCurrentMethod().Name;
Debug.WriteLine("Method " + methodName + " failed " + ex.Message);
return 0;
//throw;
}
}
运行良好的最终代码。非常感谢@usr 和其他所有人的帮助。
private Task _feedTask;
private SslStream _sslStream;
private StreamReader _streamReader;
private static readonly log4net.ILog _logger
= log4net.LogManager.GetLogger(
System.Reflection.MethodBase.GetCurrentMethod()
.DeclaringType);
public void CreateNewTaskAndStartReadingFeed()
{
_feedTask = new Task(() => StartReadingFeed());
_feedTask.Start();
}
public void StartReadingFeed()
{
try
{
while (true)
{
var message = GetStreamMessage();
if (message != null)
{
Debug.WriteLine("StartReadingFeed message: " + message);
OnReceivedSomething(message);
}
else
{
Debug.WriteLine("StartReadingFeed message was null");
}
}
}
catch (Exception ex)
{
var methodName = System.Reflection.MethodBase.GetCurrentMethod().Name;
Debug.WriteLine($"Method {methodName} failed " + ex.Message);
Debug.WriteLine("Creating a new task and starts to reed feed again");
_logger.Error("StartReadingFeed failed", ex);
CreateNewTaskAndStartReadingFeed();
}
}
public string GetStreamMessage()
{
try
{
return _sslStream.CanRead ? _streamReader.ReadLine() : null;
}
catch (Exception ex)
{
var methodName = System.Reflection.MethodBase.GetCurrentMethod().Name;
Debug.WriteLine($"Method {methodName} failed " + ex.Message);
_logger.Error($"{methodName} failed.", ex);
return null;
}
}