为什么 zmq ( inproc:// )-connection 的顺序很重要,不像 ( tcp:// )?
Why zmq ( inproc:// )-connection's order matters, unlike for ( tcp:// )?
当以任意随机顺序启动 zmq 服务器和客户端时,通过 tcp://
传输-class 进行通信,它们足够聪明 connect/reconnect 不管顺序如何。
但是,当尝试通过 inproc://
传输 运行 相同时-class,我发现它只有在客户端在服务器之后启动时才有效.我们怎样才能避免这种情况?
MCVE-代码:
这里有一些 kotlin MCVE 代码示例,用于重现声明(这是众所周知的天气示例的修改版本)
server.kt
- 运行 这到 运行 服务器独立
package sandbox.zmq
import org.zeromq.ZMQ
import org.zeromq.ZMQ.Context
import sandbox.util.Util.sout
import java.util.*
fun main(args: Array<String>) {
server(
context = ZMQ.context(1),
// publishTo = "tcp://localhost:5556"
publishTo = "tcp://localhost:5557"
)
}
fun server(context: Context, publishTo: String) {
val publisher = context.socket(ZMQ.PUB)
publisher.bind(publishTo)
// Initialize random number generator
val srandom = Random(System.currentTimeMillis())
while (!Thread.currentThread().isInterrupted) {
// Get values that will fool the boss
val zipcode: Int
val temperature: Int
val relhumidity: Int
zipcode = 10000 + srandom.nextInt(10)
temperature = srandom.nextInt(215) - 80 + 1
relhumidity = srandom.nextInt(50) + 10 + 1
// Send message to all subscribers
val update = String.format("%05d %d %d", zipcode, temperature, relhumidity)
println("server >> $update")
publisher.send(update, 0)
Thread.sleep(500)
}
publisher.close()
context.term()
}
client.kt
- 运行 客户端独立
package sandbox.zmq
import org.zeromq.ZMQ
import org.zeromq.ZMQ.Context
import java.util.*
fun main(args: Array<String>) {
client(
context = ZMQ.context(1),
readFrom = "tcp://localhost:5557"
)
}
fun client(context: Context, readFrom: String) {
// Socket to talk to server
println("Collecting updates from weather server")
val subscriber = context.socket(ZMQ.SUB)
// subscriber.connect("tcp://localhost:");
subscriber.connect(readFrom)
// Subscribe to zipcode, default is NYC, 10001
subscriber.subscribe("".toByteArray())
// Process 100 updates
var update_nbr: Int
var total_temp: Long = 0
update_nbr = 0
while (update_nbr < 10000) {
// Use trim to remove the tailing '0' character
val string = subscriber.recvStr(0).trim { it <= ' ' }
println("client << $string")
val sscanf = StringTokenizer(string, " ")
val zipcode = Integer.valueOf(sscanf.nextToken())
val temperature = Integer.valueOf(sscanf.nextToken())
val relhumidity = Integer.valueOf(sscanf.nextToken())
total_temp += temperature.toLong()
update_nbr++
}
subscriber.close()
}
inproc.kt
- 运行 并修改为 inproc://
调用的样本场景
package sandbox.zmq
import org.zeromq.ZMQ
import kotlin.concurrent.thread
fun main(args: Array<String>) {
// clientFirst()
clientLast()
}
fun println(string: String) {
System.out.println("${Thread.currentThread().name} : $string")
}
fun clientFirst() {
val context = ZMQ.context(1)
val client = thread {
client(
context = context,
readFrom = "inproc://backend"
)
}
// use this to maintain order
Thread.sleep(10)
val server = thread {
server(
context = context,
publishTo = "inproc://backend"
)
}
readLine()
client.interrupt()
server.interrupt()
}
fun clientLast() {
val context = ZMQ.context(1)
val server = thread {
server(
context = context,
publishTo = "inproc://backend"
)
}
// use this to maintain order
Thread.sleep(10)
val client = thread {
client(
context = context,
readFrom = "inproc://backend"
)
}
readLine()
client.interrupt()
server.interrupt()
}
Why zmq inproc://
connection order matters, unlike for tcp://
?
好吧,这是 by-design 行为
考虑到本机 ZeroMQ API 警告此 by-design 行为(从那时起),问题不是问题,而是预期的 属性.
另外还要满足一个 属性:
The name [ meant an_endpoint_name
in .connect("inproc://<_an_endpoint_name_>")
]
must have been previously created by assigning it to at least one socket
within the same ØMQ context as the socket being connected.
较新版本的本机 ZeroMQ API(post 4.0),如果确实部署在各自的语言绑定/包装器下, 可能允许发布这些要求中的前者:
Since version 4.0 the order of zmq_bind()
and zmq_connect()
does not matter just like for the tcp transport type.
How can we avoid this?
好吧,更难的部分......
如果在 ZeroMQ 本机 API v4.2+ 之上还没有一个简单的方法,可以卷起袖子,re-factor pre-4.x 语言包装器/绑定,让引擎到达那里,或者,可能是,测试Martin SUSTRIK的第二个可爱child,nanomsg
是否适合场景实现这个。
当以任意随机顺序启动 zmq 服务器和客户端时,通过 tcp://
传输-class 进行通信,它们足够聪明 connect/reconnect 不管顺序如何。
但是,当尝试通过 inproc://
传输 运行 相同时-class,我发现它只有在客户端在服务器之后启动时才有效.我们怎样才能避免这种情况?
MCVE-代码:
这里有一些 kotlin MCVE 代码示例,用于重现声明(这是众所周知的天气示例的修改版本)
server.kt
- 运行 这到 运行 服务器独立
package sandbox.zmq
import org.zeromq.ZMQ
import org.zeromq.ZMQ.Context
import sandbox.util.Util.sout
import java.util.*
fun main(args: Array<String>) {
server(
context = ZMQ.context(1),
// publishTo = "tcp://localhost:5556"
publishTo = "tcp://localhost:5557"
)
}
fun server(context: Context, publishTo: String) {
val publisher = context.socket(ZMQ.PUB)
publisher.bind(publishTo)
// Initialize random number generator
val srandom = Random(System.currentTimeMillis())
while (!Thread.currentThread().isInterrupted) {
// Get values that will fool the boss
val zipcode: Int
val temperature: Int
val relhumidity: Int
zipcode = 10000 + srandom.nextInt(10)
temperature = srandom.nextInt(215) - 80 + 1
relhumidity = srandom.nextInt(50) + 10 + 1
// Send message to all subscribers
val update = String.format("%05d %d %d", zipcode, temperature, relhumidity)
println("server >> $update")
publisher.send(update, 0)
Thread.sleep(500)
}
publisher.close()
context.term()
}
client.kt
- 运行 客户端独立
package sandbox.zmq
import org.zeromq.ZMQ
import org.zeromq.ZMQ.Context
import java.util.*
fun main(args: Array<String>) {
client(
context = ZMQ.context(1),
readFrom = "tcp://localhost:5557"
)
}
fun client(context: Context, readFrom: String) {
// Socket to talk to server
println("Collecting updates from weather server")
val subscriber = context.socket(ZMQ.SUB)
// subscriber.connect("tcp://localhost:");
subscriber.connect(readFrom)
// Subscribe to zipcode, default is NYC, 10001
subscriber.subscribe("".toByteArray())
// Process 100 updates
var update_nbr: Int
var total_temp: Long = 0
update_nbr = 0
while (update_nbr < 10000) {
// Use trim to remove the tailing '0' character
val string = subscriber.recvStr(0).trim { it <= ' ' }
println("client << $string")
val sscanf = StringTokenizer(string, " ")
val zipcode = Integer.valueOf(sscanf.nextToken())
val temperature = Integer.valueOf(sscanf.nextToken())
val relhumidity = Integer.valueOf(sscanf.nextToken())
total_temp += temperature.toLong()
update_nbr++
}
subscriber.close()
}
inproc.kt
- 运行 并修改为 inproc://
调用的样本场景
package sandbox.zmq
import org.zeromq.ZMQ
import kotlin.concurrent.thread
fun main(args: Array<String>) {
// clientFirst()
clientLast()
}
fun println(string: String) {
System.out.println("${Thread.currentThread().name} : $string")
}
fun clientFirst() {
val context = ZMQ.context(1)
val client = thread {
client(
context = context,
readFrom = "inproc://backend"
)
}
// use this to maintain order
Thread.sleep(10)
val server = thread {
server(
context = context,
publishTo = "inproc://backend"
)
}
readLine()
client.interrupt()
server.interrupt()
}
fun clientLast() {
val context = ZMQ.context(1)
val server = thread {
server(
context = context,
publishTo = "inproc://backend"
)
}
// use this to maintain order
Thread.sleep(10)
val client = thread {
client(
context = context,
readFrom = "inproc://backend"
)
}
readLine()
client.interrupt()
server.interrupt()
}
Why zmq
inproc://
connection order matters, unlike fortcp://
?
好吧,这是 by-design 行为
考虑到本机 ZeroMQ API 警告此 by-design 行为(从那时起),问题不是问题,而是预期的 属性.
另外还要满足一个 属性:
The name [ meant
an_endpoint_name
in.connect("inproc://<_an_endpoint_name_>")
]
must have been previously created by assigning it to at least one socket
within the same ØMQ context as the socket being connected.
较新版本的本机 ZeroMQ API(post 4.0),如果确实部署在各自的语言绑定/包装器下, 可能允许发布这些要求中的前者:
Since version 4.0 the order of
zmq_bind()
andzmq_connect()
does not matter just like for the tcp transport type.
How can we avoid this?
好吧,更难的部分......
如果在 ZeroMQ 本机 API v4.2+ 之上还没有一个简单的方法,可以卷起袖子,re-factor pre-4.x 语言包装器/绑定,让引擎到达那里,或者,可能是,测试Martin SUSTRIK的第二个可爱child,nanomsg
是否适合场景实现这个。