为什么绑定的 SUB 只从连接的 PUB 收到一条消息?
Why does a bound SUB receive only one message from a connecting PUB?
我正在为我的 ZeroMQ CLR namespace 制作示例,但是我对 PUB/SUB 有疑问。
为什么我只收到第一条消息?有时我收到 no 消息,如果我通过客户端调试(在 PubSub_Client(arg);
上)我收到 some 消息。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Security.Cryptography;
using ZeroMQ;
namespace ZeroMQ.Test
{
static partial class Program
{
static string PubSub_FrontendAddress = "tcp://127.0.0.1:2772";
public static void Main(string[] args)
{
if (args == null || args.Length < 1)
{
// say here were some arguments...
args = new string[] { "World" };
}
// Setup the ZContext
context = ZContext.Create();
CancellationTokenSource cancellor0 = null;
{
// Create the "Server" cancellor and threads
cancellor0 = new CancellationTokenSource();
var serverThread = new Thread(PubSub_Server);
serverThread.Start(cancellor0.Token);
serverThread.Join(64);
}
{
Thread.Sleep(1000);
Console.WriteLine("Starting...");
// foreach arg we are the Client, asking the Server
foreach (string arg in args)
{
PubSub_Client(arg);
// Thread.Sleep(1000);
}
Console.WriteLine("Ended...");
}
if (cancellor0 != null)
{
// Cancel the Server
cancellor0.Cancel();
}
// we could have done here context.Terminate()
}
static void PubSub_Server(object cancelluS)
{
var cancellus = (CancellationToken)cancelluS;
using (var socket = ZSocket.Create(context, ZSocketType.SUB))
{
socket.Bind(PubSub_FrontendAddress);
socket.SubscribeAll();
/* var poller = ZPollItem.Create(socket, (ZSocket _socket, out ZMessage message, out ZError _error) =>
{
while (null == (message = _socket.ReceiveMessage(/* ZSocketFlags.DontWait, * out _error)))
{
if (_error == ZError.EAGAIN)
{
_error = ZError.None;
Thread.Sleep(1);
continue;
}
throw new ZException(_error);
}
return true;
}); /**/
while (!cancellus.IsCancellationRequested)
{
ZError error;
ZMessage request;
/* if (!poller.TryPollIn(out request, out error, TimeSpan.FromMilliseconds(512)))
{
if (error == ZError.EAGAIN)
{
error = ZError.None;
Thread.Sleep(1);
continue;
}
throw new ZException(error);
} /**/
if (null == (request = socket.ReceiveMessage(ZSocketFlags.DontWait, out error)))
{
if (error == ZError.EAGAIN)
{
error = ZError.None;
Thread.Sleep(1);
continue;
}
throw new ZException(error);
} /**/
foreach (ZFrame frame in request)
{
string strg = frame.ReadString();
Console.WriteLine("{0} said hello!", strg);
}
}
socket.Unbind(PubSub_FrontendAddress);
}
}
static void PubSub_Client(string name)
{
using (var socket = ZSocket.Create(context, ZSocketType.PUB))
{
using (var crypto = new RNGCryptoServiceProvider())
{
var identity = new byte[8];
crypto.GetBytes(identity);
socket.Identity = identity;
}
socket.Connect(PubSub_FrontendAddress);
using (var request = new ZMessage())
{
request.Add(new ZFrame(name));
socket.Send(request);
}
socket.Disconnect(PubSub_FrontendAddress);
}
}
}
}
嗯...有一个 comment by Martin Sustrik: "The problem is that connecting is asynchronous and takes certain amount of time."
现在 Thread.Sleep(64)
- 并且有效...:[=13=]
static void PubSub_Client(string name)
{
using (var socket = ZSocket.Create(context, ZSocketType.PUB))
{
socket.Connect(PubSub_FrontendAddress);
Thread.Sleep(64);
using (var request = new ZMessage())
{
request.Add(new ZFrame(name));
socket.Send(request);
}
socket.Disconnect(PubSub_FrontendAddress);
}
}
你知道建立连接的更好方法吗?
我遇到了你的设计问题,这似乎是错误的:
单个订阅者和多个发布者是一个奇怪的选择。我相信你有充分的理由,但你应该说出那是什么。当从多个客户端向单个服务器发送消息时,通常使用 DEALER/ROUTER 套接字代替。 PUB/SUB 适用于少数发布者到大量订阅者。
连接、发送一条消息然后立即断开连接的客户端是另一个非常不寻常的用例,我希望它只是一个例子:
- 一方面,您很容易遇到延迟问题,即如果消息未在延迟超时内发送,则会在断开连接时被丢弃。 [我不知道你的语言绑定的默认延迟是多少,所以这可能是也可能不是问题,但你至少应该检查以确保它不是。]
- 另一方面,正如您已经发现的那样,连接到套接字所需的时间存在问题,如果在套接字正确连接之前发送它们,可能会导致 PUB 消息被丢弃。
如果您坚持以这种方式使用 PUB/SUB,您将需要一个带外协议来在发送发布消息之前同步 PUB 和 SUB 线程。在 zeromq 指南中有一些关于如何可靠 pub/sub 的示例。这将涉及同一线程中的第二组套接字来发送同步消息; DEALER 套接字不会丢弃消息,这就是为什么它们适用于该目的...
但是,DEALER/ROUTER 插座似乎是比 PUB/SUB 更好的选择,除非有一些尚未公开的设计要求。
我正在为我的 ZeroMQ CLR namespace 制作示例,但是我对 PUB/SUB 有疑问。
为什么我只收到第一条消息?有时我收到 no 消息,如果我通过客户端调试(在 PubSub_Client(arg);
上)我收到 some 消息。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Security.Cryptography;
using ZeroMQ;
namespace ZeroMQ.Test
{
static partial class Program
{
static string PubSub_FrontendAddress = "tcp://127.0.0.1:2772";
public static void Main(string[] args)
{
if (args == null || args.Length < 1)
{
// say here were some arguments...
args = new string[] { "World" };
}
// Setup the ZContext
context = ZContext.Create();
CancellationTokenSource cancellor0 = null;
{
// Create the "Server" cancellor and threads
cancellor0 = new CancellationTokenSource();
var serverThread = new Thread(PubSub_Server);
serverThread.Start(cancellor0.Token);
serverThread.Join(64);
}
{
Thread.Sleep(1000);
Console.WriteLine("Starting...");
// foreach arg we are the Client, asking the Server
foreach (string arg in args)
{
PubSub_Client(arg);
// Thread.Sleep(1000);
}
Console.WriteLine("Ended...");
}
if (cancellor0 != null)
{
// Cancel the Server
cancellor0.Cancel();
}
// we could have done here context.Terminate()
}
static void PubSub_Server(object cancelluS)
{
var cancellus = (CancellationToken)cancelluS;
using (var socket = ZSocket.Create(context, ZSocketType.SUB))
{
socket.Bind(PubSub_FrontendAddress);
socket.SubscribeAll();
/* var poller = ZPollItem.Create(socket, (ZSocket _socket, out ZMessage message, out ZError _error) =>
{
while (null == (message = _socket.ReceiveMessage(/* ZSocketFlags.DontWait, * out _error)))
{
if (_error == ZError.EAGAIN)
{
_error = ZError.None;
Thread.Sleep(1);
continue;
}
throw new ZException(_error);
}
return true;
}); /**/
while (!cancellus.IsCancellationRequested)
{
ZError error;
ZMessage request;
/* if (!poller.TryPollIn(out request, out error, TimeSpan.FromMilliseconds(512)))
{
if (error == ZError.EAGAIN)
{
error = ZError.None;
Thread.Sleep(1);
continue;
}
throw new ZException(error);
} /**/
if (null == (request = socket.ReceiveMessage(ZSocketFlags.DontWait, out error)))
{
if (error == ZError.EAGAIN)
{
error = ZError.None;
Thread.Sleep(1);
continue;
}
throw new ZException(error);
} /**/
foreach (ZFrame frame in request)
{
string strg = frame.ReadString();
Console.WriteLine("{0} said hello!", strg);
}
}
socket.Unbind(PubSub_FrontendAddress);
}
}
static void PubSub_Client(string name)
{
using (var socket = ZSocket.Create(context, ZSocketType.PUB))
{
using (var crypto = new RNGCryptoServiceProvider())
{
var identity = new byte[8];
crypto.GetBytes(identity);
socket.Identity = identity;
}
socket.Connect(PubSub_FrontendAddress);
using (var request = new ZMessage())
{
request.Add(new ZFrame(name));
socket.Send(request);
}
socket.Disconnect(PubSub_FrontendAddress);
}
}
}
}
嗯...有一个 comment by Martin Sustrik: "The problem is that connecting is asynchronous and takes certain amount of time."
现在 Thread.Sleep(64)
- 并且有效...:[=13=]
static void PubSub_Client(string name)
{
using (var socket = ZSocket.Create(context, ZSocketType.PUB))
{
socket.Connect(PubSub_FrontendAddress);
Thread.Sleep(64);
using (var request = new ZMessage())
{
request.Add(new ZFrame(name));
socket.Send(request);
}
socket.Disconnect(PubSub_FrontendAddress);
}
}
你知道建立连接的更好方法吗?
我遇到了你的设计问题,这似乎是错误的:
单个订阅者和多个发布者是一个奇怪的选择。我相信你有充分的理由,但你应该说出那是什么。当从多个客户端向单个服务器发送消息时,通常使用 DEALER/ROUTER 套接字代替。 PUB/SUB 适用于少数发布者到大量订阅者。
连接、发送一条消息然后立即断开连接的客户端是另一个非常不寻常的用例,我希望它只是一个例子:
- 一方面,您很容易遇到延迟问题,即如果消息未在延迟超时内发送,则会在断开连接时被丢弃。 [我不知道你的语言绑定的默认延迟是多少,所以这可能是也可能不是问题,但你至少应该检查以确保它不是。]
- 另一方面,正如您已经发现的那样,连接到套接字所需的时间存在问题,如果在套接字正确连接之前发送它们,可能会导致 PUB 消息被丢弃。
如果您坚持以这种方式使用 PUB/SUB,您将需要一个带外协议来在发送发布消息之前同步 PUB 和 SUB 线程。在 zeromq 指南中有一些关于如何可靠 pub/sub 的示例。这将涉及同一线程中的第二组套接字来发送同步消息; DEALER 套接字不会丢弃消息,这就是为什么它们适用于该目的...
但是,DEALER/ROUTER 插座似乎是比 PUB/SUB 更好的选择,除非有一些尚未公开的设计要求。