同一台机器上的 ZeroMQ PUB/SUB 拓扑

ZeroMQ PUB/SUB topology on the same machine

发布者是否可以使用 ZeroMQ 向同一台机器上的多个客户端发布消息?我想要一组客户端,每个客户端都可以使用 SocketType.REQ 和 SocketType.REP 进行标准 Request/Response 调用,但也可以使用 SocketType.SUB 和 SocketType.PUB.

接收通知

虽然我的版本只有一个发布者,但我已尝试实现此拓扑,取自 here

这是我的发布者:

public class ZMQServerSmall 
{   
    public static void main(String[] args)
    {
        try (ZContext context = new ZContext()) 
        {           
            ZMQ.Socket rep = context.createSocket(SocketType.REP);
            rep.bind("tcp://*:5555");

            ZMQ.Socket pub = context.createSocket(SocketType.PUB);
            pub.bind("tcp://*:7777");           

            while (!Thread.currentThread().isInterrupted()) 
            {                   
                String req = rep.recvStr(0);
                rep.send(req + " response");

                pub.sendMore("Message header");
                pub.send("Message body");;          
            }
        }
    }
}

这是我的代理(我包含了一个监听器来尝试查看发生了什么):

public class ZMQForwarderSmall 
{
    public static void main(String[] args) 
    {       
        try 
        (
            ZContext context = new ZContext();   
        )
        {
            ZMQ.Socket frontend = context.createSocket(SocketType.XSUB);            
            frontend.connect("tcp://*:7777");

            ZMQ.Socket backend = context.createSocket(SocketType.XPUB);
            backend.bind("tcp://*:6666");

            IAttachedRunnable runnable = new Listener();
            Socket listener = ZThread.fork(context, runnable);

            ZMQ.proxy(frontend, backend, listener);
        }
        catch (Exception e) 
        {
            System.err.println(e.getMessage());
        } 
    }

    private static class Listener implements IAttachedRunnable 
    {    
        @Override
        public void run(Object[] args, ZContext ctx, Socket pipe) 
        {
            while (true) 
            {
                ZFrame frame = ZFrame.recvFrame(pipe);
                if (frame == null)
                    break; // Interrupted
                System.out.println(frame.toString());
                frame.destroy();
            }
        }
    }
}

这是我的订阅者:

public class ZMQClientSmall
{   
    public static void main(String[] args) throws IOException
    {
        String input;

        try 
        (
            ZContext context = new ZContext();
            BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in))
        ) 
        { 
            ZMQ.Socket reqSocket = context.createSocket(SocketType.REQ);
            reqSocket.connect("tcp://localhost:5555");

            ZMQ.Socket subSocket = context.createSocket(SocketType.SUB);
            subSocket.connect("tcp://localhost:6666");

            subSocket.subscribe("".getBytes(ZMQ.CHARSET));

            while ((input = stdIn.readLine()) != null)
            {
                reqSocket.send(input.getBytes(ZMQ.CHARSET), 0);
                String response = reqSocket.recvStr(0);

                String address = subSocket.recvStr(ZMQ.DONTWAIT);
                String contents = subSocket.recvStr(ZMQ.DONTWAIT);
                System.out.println("Notification received: " + address + " : " + contents);
            }
        }   
    }   
}

这是测试。我打开四个终端; 1 个发布者、1 个代理和 2 个客户端。当我在两个客户端中的任何一个中发出请求时,我希望在两个客户端中都看到通知,但我只在发出请求的终端中看到通知。我知道两个客户端都使用相同的地址 (localhost:6666),但我希望代理能够解决该问题。

谁能看出这里有什么明显的错误吗?

Q : Is it possible for a publisher to publish to multiple clients on the same machine using ZeroMQ?

当然,是的。毫无疑问。


检查代码。执行顺序的责任在那里。在中总是如此。

一旦 [Client]-No1 实例 得到合理的 .readLine()-ed input 它将跳入:

        while ((input = stdIn.readLine()) != null)
        {
            reqSocket.send(input.getBytes(ZMQ.CHARSET), 0);
            String response = reqSocket.recvStr(0);

            String address = subSocket.recvStr(ZMQ.DONTWAIT);
            String contents = subSocket.recvStr(ZMQ.DONTWAIT);
            System.out.println(           "Notification received: "
                              + address + " : "
                              + contents
                                );
        }

接下来 .send()-s 超过 REQ 并阻止(等待 REP 响应)

鉴于 [Client]-No2 实例 也得到了一个合理的手册 .readLine()-ed input 它将跳入相同的 while(){...},但它只会再次阻塞等待 REP-响应。在 -No1REP 端得到服务后,任何时候都不会得到 .recv()-ed,所以虽然 -No1 可能已经脱离阻塞-.recv(),而不是 -No2(对于任何下一个 REP 端响应(可能会但不需要),而 No1 已经进行到 PUB/SUB-.recv() ,它会收到(但是 never No2 ),接下来冲进下一个阻塞-input-feed 来自 .readLine() 等等,等等,等等,...,Ad Infinitum

因此,这些 SEQ-of-In-Loop (REQ)-部分后跟 (SUB)-部分无论 N > 1[Client] 实例,都有效地生成了 独家滴答- Tick-Tock 时钟机,相互阻塞 PUB-ed 在 N[= 中的独占传送106=]-交错顺序(不讲手动,.readLine()-驱动,阻塞步)

ZMQServerSmall 不知道有什么问题,因为它 .send()-s 为了任何 .recvStr()-ed 交易对手在 REQ/REPPUB-s 上发送给所有交易对手(它们不会自动读取,但只有在手动 .readLine() 解锁之后才可以(在 REQ/REP 偶发之后(可能无限阻塞)步骤)可能 .recv() 它的下一个(到目前为止还没有读取消息部分(但是,我没有看到任何代码可以显式处理 [=38 上多部分标志的存在/不存在) =]端.recv()操作)

        while (!Thread.currentThread().isInterrupted()) 
        {                   
            String req = rep.recvStr(0);
            rep.send(req + " response");

            pub.sendMore("Message header");
            pub.send("Message body");;          
        }

同时 ZMQServerSmallPUB 广播通道中发送 ( N - 1 ) 倍的消息,因此 Tick-Tock -Tick-Tock MUTEX REQ/SUB-loop-blocking "pendulum"不是2-State,而是N-state on the receiving sides ( all receive同样的 PUB-ed 消息流,
yet interleaved REQ/REP MUTEX-步进)