为什么 zmq ( inproc:// )-connection 的顺序很重要,不像 ( tcp:// )?

Why zmq ( inproc:// )-connection's order matters, unlike for ( tcp:// )?

当以任意随机顺序启动 zmq 服务器和客户端时,通过 tcp:// 传输-class 进行通信,它们足够聪明 connect/reconnect 不管顺序如何。

但是,当尝试通过 inproc:// 传输 运行 相同时-class,我发现它只有在客户端在服务器之后启动时才有效.我们怎样才能避免这种情况?


MCVE-代码:

这里有一些 kotlin MCVE 代码示例,用于重现声明(这是众所周知的天气示例的修改版本)

server.kt - 运行 这到 运行 服务器独立

package sandbox.zmq

import org.zeromq.ZMQ
import org.zeromq.ZMQ.Context
import sandbox.util.Util.sout
import java.util.*

fun main(args: Array<String>) {
   server(
      context = ZMQ.context(1),
//      publishTo = "tcp://localhost:5556"
      publishTo = "tcp://localhost:5557"
   )
}

fun server(context: Context, publishTo: String) {
   val publisher = context.socket(ZMQ.PUB)
   publisher.bind(publishTo)

   //  Initialize random number generator
   val srandom = Random(System.currentTimeMillis())
   while (!Thread.currentThread().isInterrupted) {
      //  Get values that will fool the boss
      val zipcode: Int
      val temperature: Int
      val relhumidity: Int
      zipcode = 10000 + srandom.nextInt(10)
      temperature = srandom.nextInt(215) - 80 + 1
      relhumidity = srandom.nextInt(50) + 10 + 1

      //  Send message to all subscribers
      val update = String.format("%05d %d %d", zipcode, temperature, relhumidity)
      println("server >> $update")
      publisher.send(update, 0)
      Thread.sleep(500)
   }

   publisher.close()
   context.term()
}

client.kt - 运行 客户端独立

package sandbox.zmq

import org.zeromq.ZMQ
import org.zeromq.ZMQ.Context
import java.util.*

fun main(args: Array<String>) {
   client(
      context = ZMQ.context(1),
      readFrom = "tcp://localhost:5557"
   )
}

fun client(context: Context, readFrom: String) {
   //  Socket to talk to server
   println("Collecting updates from weather server")
   val subscriber = context.socket(ZMQ.SUB)
   //        subscriber.connect("tcp://localhost:");
   subscriber.connect(readFrom)

   //  Subscribe to zipcode, default is NYC, 10001
   subscriber.subscribe("".toByteArray())

   //  Process 100 updates
   var update_nbr: Int
   var total_temp: Long = 0
   update_nbr = 0
   while (update_nbr < 10000) {
      //  Use trim to remove the tailing '0' character
      val string = subscriber.recvStr(0).trim { it <= ' ' }
      println("client << $string")
      val sscanf = StringTokenizer(string, " ")
      val zipcode = Integer.valueOf(sscanf.nextToken())
      val temperature = Integer.valueOf(sscanf.nextToken())
      val relhumidity = Integer.valueOf(sscanf.nextToken())

      total_temp += temperature.toLong()
      update_nbr++

   }
   subscriber.close()
}

inproc.kt - 运行 并修改为 inproc:// 调用的样本场景

package sandbox.zmq

import org.zeromq.ZMQ
import kotlin.concurrent.thread


fun main(args: Array<String>) {
//   clientFirst()
   clientLast()
}

fun println(string: String) {
   System.out.println("${Thread.currentThread().name} : $string")
}

fun clientFirst() {

   val context = ZMQ.context(1)

   val client = thread {
      client(
         context = context,
         readFrom = "inproc://backend"
      )
   }

   // use this to maintain order
   Thread.sleep(10)

   val server = thread {
      server(
         context = context,
         publishTo = "inproc://backend"
      )
   }

   readLine()
   client.interrupt()
   server.interrupt()
}

fun clientLast() {

   val context = ZMQ.context(1)

   val server = thread {
      server(
         context = context,
         publishTo = "inproc://backend"
      )
   }

   // use this to maintain order
   Thread.sleep(10)

   val client = thread {
      client(
         context = context,
         readFrom = "inproc://backend"
      )
   }

   readLine()
   client.interrupt()
   server.interrupt()
}

Why zmq inproc:// connection order matters, unlike for tcp://?

好吧,这是 by-design 行为

考虑到本机 ZeroMQ API 警告此 by-design 行为(从那时起),问题不是问题,而是预期的 属性.

另外还要满足一个 属性:

The name [ meant an_endpoint_name in .connect("inproc://<_an_endpoint_name_>")]
must have been previously created by assigning it to at least one socket
within the same ØMQ context as the socket being connected.

较新版本的本机 ZeroMQ API(post 4.0),如果确实部署在各自的语言绑定/包装器下, 可能允许发布这些要求中的前者:

Since version 4.0 the order of zmq_bind() and zmq_connect() does not matter just like for the tcp transport type.


How can we avoid this?

好吧,更难的部分......

如果在 ZeroMQ 本机 API v4.2+ 之上还没有一个简单的方法,可以卷起袖子,re-factor pre-4.x 语言包装器/绑定,让引擎到达那里,或者,可能是,测试Martin SUSTRIK的第二个可爱child,nanomsg是否适合场景实现这个。