同一台机器上的 ZeroMQ PUB/SUB 拓扑
ZeroMQ PUB/SUB topology on the same machine
发布者是否可以使用 ZeroMQ 向同一台机器上的多个客户端发布消息?我想要一组客户端,每个客户端都可以使用 SocketType.REQ 和 SocketType.REP 进行标准 Request/Response 调用,但也可以使用 SocketType.SUB 和 SocketType.PUB.
接收通知
虽然我的版本只有一个发布者,但我已尝试实现此拓扑,取自 here。
这是我的发布者:
public class ZMQServerSmall
{
public static void main(String[] args)
{
try (ZContext context = new ZContext())
{
ZMQ.Socket rep = context.createSocket(SocketType.REP);
rep.bind("tcp://*:5555");
ZMQ.Socket pub = context.createSocket(SocketType.PUB);
pub.bind("tcp://*:7777");
while (!Thread.currentThread().isInterrupted())
{
String req = rep.recvStr(0);
rep.send(req + " response");
pub.sendMore("Message header");
pub.send("Message body");;
}
}
}
}
这是我的代理(我包含了一个监听器来尝试查看发生了什么):
public class ZMQForwarderSmall
{
public static void main(String[] args)
{
try
(
ZContext context = new ZContext();
)
{
ZMQ.Socket frontend = context.createSocket(SocketType.XSUB);
frontend.connect("tcp://*:7777");
ZMQ.Socket backend = context.createSocket(SocketType.XPUB);
backend.bind("tcp://*:6666");
IAttachedRunnable runnable = new Listener();
Socket listener = ZThread.fork(context, runnable);
ZMQ.proxy(frontend, backend, listener);
}
catch (Exception e)
{
System.err.println(e.getMessage());
}
}
private static class Listener implements IAttachedRunnable
{
@Override
public void run(Object[] args, ZContext ctx, Socket pipe)
{
while (true)
{
ZFrame frame = ZFrame.recvFrame(pipe);
if (frame == null)
break; // Interrupted
System.out.println(frame.toString());
frame.destroy();
}
}
}
}
这是我的订阅者:
public class ZMQClientSmall
{
public static void main(String[] args) throws IOException
{
String input;
try
(
ZContext context = new ZContext();
BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in))
)
{
ZMQ.Socket reqSocket = context.createSocket(SocketType.REQ);
reqSocket.connect("tcp://localhost:5555");
ZMQ.Socket subSocket = context.createSocket(SocketType.SUB);
subSocket.connect("tcp://localhost:6666");
subSocket.subscribe("".getBytes(ZMQ.CHARSET));
while ((input = stdIn.readLine()) != null)
{
reqSocket.send(input.getBytes(ZMQ.CHARSET), 0);
String response = reqSocket.recvStr(0);
String address = subSocket.recvStr(ZMQ.DONTWAIT);
String contents = subSocket.recvStr(ZMQ.DONTWAIT);
System.out.println("Notification received: " + address + " : " + contents);
}
}
}
}
这是测试。我打开四个终端; 1 个发布者、1 个代理和 2 个客户端。当我在两个客户端中的任何一个中发出请求时,我希望在两个客户端中都看到通知,但我只在发出请求的终端中看到通知。我知道两个客户端都使用相同的地址 (localhost:6666),但我希望代理能够解决该问题。
谁能看出这里有什么明显的错误吗?
Q : Is it possible for a publisher to publish to multiple clients on the same machine using ZeroMQ?
哦当然,是的。毫无疑问。
检查代码。执行顺序的责任在那里。在distributed-systems中总是如此。
一旦 [Client]-No1
实例 得到合理的 .readLine()
-ed input
它将跳入:
while ((input = stdIn.readLine()) != null)
{
reqSocket.send(input.getBytes(ZMQ.CHARSET), 0);
String response = reqSocket.recvStr(0);
String address = subSocket.recvStr(ZMQ.DONTWAIT);
String contents = subSocket.recvStr(ZMQ.DONTWAIT);
System.out.println( "Notification received: "
+ address + " : "
+ contents
);
}
接下来 .send()
-s 超过 REQ
并阻止(等待 REP
响应)
鉴于 [Client]-No2
实例 也得到了一个合理的手册 .readLine()
-ed input
它将跳入相同的 while(){...}
,但它只会再次阻塞等待 REP
-响应。在 -No1
从 REP
端得到服务后,任何时候都不会得到 .recv()
-ed,所以虽然 -No1
可能已经脱离阻塞-.recv()
,而不是 -No2
(对于任何下一个 REP
端响应(可能会但不需要),而 No1
已经进行到 PUB/SUB-.recv()
,它会收到(但是 never No2
),接下来冲进下一个阻塞-input
-feed 来自 .readLine()
等等,等等,等等,...,Ad Infinitum
因此,这些 SEQ
-of-In-Loop (REQ
)-部分后跟 (SUB
)-部分无论 N > 1
的 [Client]
实例,都有效地生成了 独家滴答- Tick-Tock 时钟机,相互阻塞 PUB
-ed 在 N
[= 中的独占传送106=]-交错顺序(不讲手动,.readLine()
-驱动,阻塞步)
ZMQServerSmall
不知道有什么问题,因为它 .send()
-s 为了任何 .recvStr()
-ed 交易对手在 REQ/REP
和 PUB
-s 上发送给所有交易对手(它们不会自动读取,但只有在手动 .readLine()
解锁之后才可以(在 REQ/REP
偶发之后(可能无限阻塞)步骤)可能 .recv()
它的下一个(到目前为止还没有读取消息部分(但是,我没有看到任何代码可以显式处理 [=38 上多部分标志的存在/不存在) =]端.recv()
操作)
while (!Thread.currentThread().isInterrupted())
{
String req = rep.recvStr(0);
rep.send(req + " response");
pub.sendMore("Message header");
pub.send("Message body");;
}
同时 ZMQServerSmall
在 PUB
广播通道中发送 ( N - 1 ) 倍的消息,因此 Tick-Tock -Tick-Tock MUTEX REQ/SUB
-loop-blocking "pendulum"不是2-State,而是N
-state on the receiving sides ( all receive同样的 PUB
-ed 消息流,
yet interleaved REQ/REP
MUTEX-步进)
发布者是否可以使用 ZeroMQ 向同一台机器上的多个客户端发布消息?我想要一组客户端,每个客户端都可以使用 SocketType.REQ 和 SocketType.REP 进行标准 Request/Response 调用,但也可以使用 SocketType.SUB 和 SocketType.PUB.
接收通知虽然我的版本只有一个发布者,但我已尝试实现此拓扑,取自 here。
这是我的发布者:
public class ZMQServerSmall
{
public static void main(String[] args)
{
try (ZContext context = new ZContext())
{
ZMQ.Socket rep = context.createSocket(SocketType.REP);
rep.bind("tcp://*:5555");
ZMQ.Socket pub = context.createSocket(SocketType.PUB);
pub.bind("tcp://*:7777");
while (!Thread.currentThread().isInterrupted())
{
String req = rep.recvStr(0);
rep.send(req + " response");
pub.sendMore("Message header");
pub.send("Message body");;
}
}
}
}
这是我的代理(我包含了一个监听器来尝试查看发生了什么):
public class ZMQForwarderSmall
{
public static void main(String[] args)
{
try
(
ZContext context = new ZContext();
)
{
ZMQ.Socket frontend = context.createSocket(SocketType.XSUB);
frontend.connect("tcp://*:7777");
ZMQ.Socket backend = context.createSocket(SocketType.XPUB);
backend.bind("tcp://*:6666");
IAttachedRunnable runnable = new Listener();
Socket listener = ZThread.fork(context, runnable);
ZMQ.proxy(frontend, backend, listener);
}
catch (Exception e)
{
System.err.println(e.getMessage());
}
}
private static class Listener implements IAttachedRunnable
{
@Override
public void run(Object[] args, ZContext ctx, Socket pipe)
{
while (true)
{
ZFrame frame = ZFrame.recvFrame(pipe);
if (frame == null)
break; // Interrupted
System.out.println(frame.toString());
frame.destroy();
}
}
}
}
这是我的订阅者:
public class ZMQClientSmall
{
public static void main(String[] args) throws IOException
{
String input;
try
(
ZContext context = new ZContext();
BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in))
)
{
ZMQ.Socket reqSocket = context.createSocket(SocketType.REQ);
reqSocket.connect("tcp://localhost:5555");
ZMQ.Socket subSocket = context.createSocket(SocketType.SUB);
subSocket.connect("tcp://localhost:6666");
subSocket.subscribe("".getBytes(ZMQ.CHARSET));
while ((input = stdIn.readLine()) != null)
{
reqSocket.send(input.getBytes(ZMQ.CHARSET), 0);
String response = reqSocket.recvStr(0);
String address = subSocket.recvStr(ZMQ.DONTWAIT);
String contents = subSocket.recvStr(ZMQ.DONTWAIT);
System.out.println("Notification received: " + address + " : " + contents);
}
}
}
}
这是测试。我打开四个终端; 1 个发布者、1 个代理和 2 个客户端。当我在两个客户端中的任何一个中发出请求时,我希望在两个客户端中都看到通知,但我只在发出请求的终端中看到通知。我知道两个客户端都使用相同的地址 (localhost:6666),但我希望代理能够解决该问题。
谁能看出这里有什么明显的错误吗?
Q : Is it possible for a publisher to publish to multiple clients on the same machine using ZeroMQ?
哦当然,是的。毫无疑问。
检查代码。执行顺序的责任在那里。在distributed-systems中总是如此。
一旦 [Client]-No1
实例 得到合理的 .readLine()
-ed input
它将跳入:
while ((input = stdIn.readLine()) != null)
{
reqSocket.send(input.getBytes(ZMQ.CHARSET), 0);
String response = reqSocket.recvStr(0);
String address = subSocket.recvStr(ZMQ.DONTWAIT);
String contents = subSocket.recvStr(ZMQ.DONTWAIT);
System.out.println( "Notification received: "
+ address + " : "
+ contents
);
}
接下来 .send()
-s 超过 REQ
并阻止(等待 REP
响应)
鉴于 [Client]-No2
实例 也得到了一个合理的手册 .readLine()
-ed input
它将跳入相同的 while(){...}
,但它只会再次阻塞等待 REP
-响应。在 -No1
从 REP
端得到服务后,任何时候都不会得到 .recv()
-ed,所以虽然 -No1
可能已经脱离阻塞-.recv()
,而不是 -No2
(对于任何下一个 REP
端响应(可能会但不需要),而 No1
已经进行到 PUB/SUB-.recv()
,它会收到(但是 never No2
),接下来冲进下一个阻塞-input
-feed 来自 .readLine()
等等,等等,等等,...,Ad Infinitum
因此,这些 SEQ
-of-In-Loop (REQ
)-部分后跟 (SUB
)-部分无论 N > 1
的 [Client]
实例,都有效地生成了 独家滴答- Tick-Tock 时钟机,相互阻塞 PUB
-ed 在 N
[= 中的独占传送106=]-交错顺序(不讲手动,.readLine()
-驱动,阻塞步)
ZMQServerSmall
不知道有什么问题,因为它 .send()
-s 为了任何 .recvStr()
-ed 交易对手在 REQ/REP
和 PUB
-s 上发送给所有交易对手(它们不会自动读取,但只有在手动 .readLine()
解锁之后才可以(在 REQ/REP
偶发之后(可能无限阻塞)步骤)可能 .recv()
它的下一个(到目前为止还没有读取消息部分(但是,我没有看到任何代码可以显式处理 [=38 上多部分标志的存在/不存在) =]端.recv()
操作)
while (!Thread.currentThread().isInterrupted())
{
String req = rep.recvStr(0);
rep.send(req + " response");
pub.sendMore("Message header");
pub.send("Message body");;
}
同时 ZMQServerSmall
在 PUB
广播通道中发送 ( N - 1 ) 倍的消息,因此 Tick-Tock -Tick-Tock MUTEX REQ/SUB
-loop-blocking "pendulum"不是2-State,而是N
-state on the receiving sides ( all receive同样的 PUB
-ed 消息流,
yet interleaved REQ/REP
MUTEX-步进)