带有 ROUTER 和 DEALER 的 ZeroMQ 异步多线程

ZeroMQ async multithreading with ROUTER and DEALER

我想知道是否有办法让 ZeroMQ 套接字只读或只写。因为,在我看来,即使有 async/multithreading 个示例,每个线程仍然使用 recv-then-send 循环。我遇到的问题是,我想要从 ZeroMQ 套接字读取的 receiveMessage() 和写入 ZeroMQ 套接字的 sendMessage(msg)。但是这些方法中的每一个都会 运行 在另一个 class 中构造的单独线程中。这是我的代码(我使用的是 Scala 中的 jeromq):

trait ZmqProtocol extends Protocol {

  val context: ZContext = new ZContext(1)
  private val frontendSocket: ZMQ.Socket = context.createSocket(ZMQ.ROUTER)
  private val backendSocket: ZMQ.Socket = context.createSocket(ZMQ.DEALER)

  frontendSocket.bind("tcp://*:5555")
  backendSocket.bind("inproc://backend")


  new Thread(() => {

    println("Started receiving messages")
    // Connect backend to frontend via a proxy
    ZMQ.proxy(frontendSocket, backendSocket, null)

  }).start()


  override def receiveMessage(): (String, String) = {

    val inprocReadSocket: ZMQ.Socket = context.createSocket(ZMQ.DEALER)
    inprocReadSocket.connect("inproc://backend")

    //  The DEALER socket gives us the address envelope and message
    val msg = ZMsg.recvMsg(inprocReadSocket)

    // Message from client's REQ socket contains 3 frames: address + empty frame + request content
    // (payload)
    val address = msg.pop
    val emptyFrame = msg.pop
    val request = msg.pop

    assert(request != null)
    msg.destroy()

    println(s"RECEIVED: $request FROM: $address")

    (address.toString, request.toString)
  }

  override def sendMessage(address: String, response: String): Unit = {

    val inprocWriteSocket: ZMQ.Socket = context.createSocket(ZMQ.DEALER)
    inprocWriteSocket.connect("inproc://backend")

    val addressFrame = new ZFrame(address)
    val emptyFrame = new ZFrame("")
    val responseFrame = new ZFrame(response)

    addressFrame.send(inprocWriteSocket, ZFrame.REUSE + ZFrame.MORE)
    // Sending empty frame because client expects such constructed message
    emptyFrame.send(inprocWriteSocket, ZFrame.REUSE + ZFrame.MORE)
    responseFrame.send(inprocWriteSocket, ZFrame.REUSE)

    addressFrame.destroy()
    emptyFrame.destroy()
    responseFrame.destroy()
  }

}

下面是我将如何使用它:

class TrafficHandler(val requestQueue: LinkedBlockingQueue[(String, Message)],
                     val responseQueue: LinkedBlockingQueue[(String, String)])
  extends Protocol {

def startHandlingTraffic(): Unit = {

    new Thread(() => {

      while (true) {

        val (address, message) = receiveMessage()

        requestQueue.put((address, message))
      }
    }).start()

    new Thread(() => {
      while (true) {

        val (address, response) = responseQueue.take()
        sendMessage(address, response)
      }
    }).start()
  }

在调试过程中,我注意到我收到了消息,正确地从响应队列(并发阻塞队列)中取出了正确的目标地址,但没有成功地发送它。我深入研究了 jeromq 代码,在我看来它与身份有关,因为 outPipe 为空。我猜这是因为我没有正确的接收发送循环。

在@user3666197 回复后编辑代码有效! (虽然如果你先启动服务器,绑定和连接到 PUSHPULL 套接字需要时间)
这是使用 PUSHPULL 套接字的修改代码:

trait ZmqProtocol extends Protocol {

  val context: ZContext = new ZContext(1)

  val frontendSocket: ZMQ.Socket = context.createSocket(ZMQ.ROUTER)
  frontendSocket.bind("tcp://*:5555")

  val requestQueueSocket: ZMQ.Socket = context.createSocket(ZMQ.PUSH)
  requestQueueSocket.bind("inproc://requestQueueSocket")

  val responseQueueSocket: ZMQ.Socket = context.createSocket(ZMQ.PULL)
  responseQueueSocket.bind("inproc://responseQueueSocket")

  val inprocRequestQueueSocket: ZMQ.Socket = context.createSocket(ZMQ.PULL)
  inprocRequestQueueSocket.connect("inproc://requestQueueSocket")

  val inprocResponseQueueSocket: ZMQ.Socket = context.createSocket(ZMQ.PUSH)
  inprocResponseQueueSocket.connect("inproc://responseQueueSocket")

  new Thread(() => {

    println("Started receiving messages")

    while (true) {

      val msg = ZMsg.recvMsg(frontendSocket)

      // Message from client's REQ socket contains 3 frames: address + empty frame + request content
      // (payload)
      val reqAddress = msg.pop
      val emptyFrame = msg.pop
      val reqPayload = msg.pop

      assert(reqPayload != null)
      msg.destroy()

      println(s"RECEIVED: $reqPayload FROM: $reqAddress")

      requestQueueSocket.send(s"$reqAddress;$reqPayload")

      val responseMessage = new String(responseQueueSocket.recv(0))
      val respMessageSplit = responseMessage.split(";")

      val respAddress = respMessageSplit(0)
      val respPayload = respMessageSplit(1)

      val array = new BigInteger(respAddress, 16).toByteArray
      val respAddressFrame = new ZFrame(array)
      val respEmptyFrame = new ZFrame("")
      val respPayloadFrame = new ZFrame(respPayload)

      respAddressFrame.send(frontendSocket, ZFrame.REUSE + ZFrame.MORE)
      // Sending empty frame because client expects such constructed message
      respEmptyFrame.send(frontendSocket, ZFrame.REUSE + ZFrame.MORE)
      respPayloadFrame.send(frontendSocket, ZFrame.REUSE)

      respAddressFrame.destroy()
      respEmptyFrame.destroy()
      respPayloadFrame.destroy()

    }

  }).start()


  override def receiveMessage(): (String, String) = {

    val message = new String(inprocRequestQueueSocket.recv(0))
    val messageSplit = message.split(";")

    val address = messageSplit(0)
    val payload = messageSplit(1)

    (address, payload)
  }

  override def sendMessage(address: String, response: String): Unit = {

    inprocResponseQueueSocket.send(s"$address;$response")
  }
}

如果需要,这里是客户端:

trait ZmqClientProtocol extends ClientProtocol {

  val context: ZMQ.Context = ZMQ.context(1)
  val socket: ZMQ.Socket = context.socket(ZMQ.REQ)

  println("Connecting to server")
  socket.connect("tcp://localhost:5555")

  override protected def send(message: String): String = {

    //  Ensure that the last byte of message is 0 because server is expecting a 0-terminated string
    val request = message.getBytes()

    // Send the message
    println(s"Sending request $request")
    socket.send(request, 0)

    //  Get the reply.
    val reply = socket.recv(0)

    new String(s"$message=${new String(reply)}")
  }
}

有没有办法让 ZeroMQ 套接字只读或只写?

是的,有几种方式。

a ) 使用串联的单纯形原型:PUSH/PULL 写入和 PULL/PUSH 读取
b ) 使用串联的单纯形原型:(X)PUB/(X)SUB 写入和 (X)SUB/(X)PUB 读取


...仍然使用.recv()-then-.send()循环。

好吧,这个观察更多地与实际的 socket-archetype 相关,其中一些确实需要 two-step(在其内部 FSA-s 中硬连线)排序 .recv()--.send()--...


...但是这些方法中的每一个都会 运行 在单独的线程中

好吧,挑战从这里开始:ZeroMQ 从一开始就被设计为主要 zero-sharing 以促进性能和独立性。 Zen-of-Zero是设计中有趣的设计原则。

然而,最近re-design的努力已经在API 4.2+中提出了将实现ZeroMQ套接字Access-points变为thread-safe的意愿(这违背了最初的原则share-nothing ), 所以如果要朝这个方向进行试验, 你可能会到达领土, 那行得通, 但代价是 Zen-of-Zero.

ZeroMQ 套接字 Access-point(s) 不应共享,即使可能,因为设计纯正。

最好为这样的 class 配备另一对单纯形 PUSH/PULL-s,如果你努力分离 OOP-concerns,但是你的 head-end(s) read-only-specialised + write-only-specialised 套接字将不得不处理的情况下,当一个 "remote" (超出外国 class-boundary 的抽象)ZeroMQ Socket-archetype FSA 及其设置和性能调整和 error-state(s) 和 "remote" class 将不得不安排所有这些加调解所有 message-transfers to/from 原生 ZeroMQ-socket (其中head-end(专门)classes)主要是隔离和隐藏的。

无论如何,只要设计得当,就可以做到。


ZeroMQ 资源不是廉价的可组合/一次性垃圾

一个想法:

...
override def sendMessage( address:  String,
                          response: String
                          ): Unit = {

             val inprocWriteSocket: ZMQ.Socket  = context.createSocket( ZMQ.DEALER )
                 inprocWriteSocket.connect( "inproc://backend" )
                 ...

在源代码中看起来很简单,但忽略了实际的设置开销并且还应尊重这样一个事实,即没有套接字(inproc://-transport-class 作为一个特例 ) 在 Context() 中实例化的那一微秒内获得 RTO ( Ready-To-Operate ),毕竟完全 .connect()-ed 和 RTO-ed 更少与远程交易对手握手,所以最好事先设置好 SIG/MSG-infrastructure 并将其作为 semi-persistent 通信层保持最佳状态,而不是任何 ad-hoc / just-in-time 启动 composable/disposable...(资源生态学)


inproc://-transport-class 还有一项要求 pre-API 4.x:

Connecting a socket

When connecting a socket to a peer address using zmq_connect() with the inproc:// transport, the endpoint shall be interpreted as an arbitrary string identifying the name to connect to. Before version 4.0 the name must have been previously created by assigning it to at least one socket within the same ØMQ context as the socket being connected. Since version 4.0 the order of zmq_bind() and zmq_connect() does not matter just like for the tcp:// transport type.

因此,在某些情况下,当您的部署不确定实际的本地主机 API 版本时,请注意执行 .bind() / .connect() 的正确顺序,否则 inproc:// 管道不适合你。