ZeroMQ thread/task 建议
ZeroMQ thread/task advice
我有一个任务是 运行 这个方法用于我的 Sub/Pub 模式。我看到的问题是这个循环将挂起,直到收到数据。如果我想更改主题或 IP,我无法结束此任务。如果我使用一个线程,我可以杀死它——但这似乎是一个丑陋的解决方案。有没有好的异步方法来设置我的订阅者的接收端。
ReceiveTask = new Task(ReceiveData);
ReceiveTask.Start();
private void ReceiveData()
{
while (true)
{
byte[] messageTopicReceived = SubSocket.ReceiveFrameBytes();
byte[] messageReceived = SubSocket.ReceiveFrameBytes();
}
//Code that uses byte array and do stuff
}
您可以改为调用 Try 扩展,它具有以下签名:
public static bool TryReceiveFrameBytes([NotNull] this IReceivingSocket socket, TimeSpan timeout, out byte[] bytes)
这将允许您将 TimeSpan.Zero
作为 timeout
传递,这将使它成为一个非阻塞调用。使用 while(true)
可以正常工作,但您可能需要在其中添加 Task.Delay
或 Thread.Sleep
。 Task.Delay
最有效,因为它接受 CancellationToken
。可以从不同的线程取消该令牌以快速退出循环。这些方面的内容:
private void ReceiveData(CancellationToken cancellationToken)
{
while (true)
{
bool receivedTopicMessage = SubSocket.TryReceiveFrameBytes(TimeSpan.Zero, out byte[] messageTopicReceived);
bool receivedMessage = SubSocket.TryReceiveFrameBytes(TimeSpan.Zero, out byte[] messageReceived);
Task.Delay(1000, cancellationToken);
if (cancellationToken.IsCancellationRequested)
{
return;
}
}
//Code that uses byte array and do stuff
}
我有一个任务是 运行 这个方法用于我的 Sub/Pub 模式。我看到的问题是这个循环将挂起,直到收到数据。如果我想更改主题或 IP,我无法结束此任务。如果我使用一个线程,我可以杀死它——但这似乎是一个丑陋的解决方案。有没有好的异步方法来设置我的订阅者的接收端。
ReceiveTask = new Task(ReceiveData);
ReceiveTask.Start();
private void ReceiveData()
{
while (true)
{
byte[] messageTopicReceived = SubSocket.ReceiveFrameBytes();
byte[] messageReceived = SubSocket.ReceiveFrameBytes();
}
//Code that uses byte array and do stuff
}
您可以改为调用 Try 扩展,它具有以下签名:
public static bool TryReceiveFrameBytes([NotNull] this IReceivingSocket socket, TimeSpan timeout, out byte[] bytes)
这将允许您将 TimeSpan.Zero
作为 timeout
传递,这将使它成为一个非阻塞调用。使用 while(true)
可以正常工作,但您可能需要在其中添加 Task.Delay
或 Thread.Sleep
。 Task.Delay
最有效,因为它接受 CancellationToken
。可以从不同的线程取消该令牌以快速退出循环。这些方面的内容:
private void ReceiveData(CancellationToken cancellationToken)
{
while (true)
{
bool receivedTopicMessage = SubSocket.TryReceiveFrameBytes(TimeSpan.Zero, out byte[] messageTopicReceived);
bool receivedMessage = SubSocket.TryReceiveFrameBytes(TimeSpan.Zero, out byte[] messageReceived);
Task.Delay(1000, cancellationToken);
if (cancellationToken.IsCancellationRequested)
{
return;
}
}
//Code that uses byte array and do stuff
}