DEALER 到 ROUTER - 故障信息
DEALER to ROUTER - out of order messages
作为 ROUTER to DEALER 的后续行动,我有一个 DEALER
正在与一个 ROUTER
:
通话
简而言之,我为每个 DEALER
创建了 2 个线程。 DEALER
的生命周期是:
- 发送
READY
到ROUTER
表明ROUTER
可以信任来自它的KILL
信号
- 等待来自
ROUTER
的 ACK
- 发送 5 条消息
发送END
消息以便ROUTER
退出
套餐net.async
import org.zeromq.ZMQ
import org.zeromq.ZMQ.Socket
import scala.annotation.tailrec
object Client {
val Empty = "".getBytes
def message(x: Int) = s"HELLO_#$x".getBytes
val Count = 5
}
class Client(name: String) extends Runnable {
import Client._
import AsyncClientServer._
override def run(): Unit = {
val context = ZMQ.context(1)
val dealer = context.socket(ZMQ.DEALER)
dealer.setIdentity(name.getBytes)
dealer.connect(s"tcp://localhost:$Port")
initiate(dealer)
}
private def initiate(dealer: Socket): Unit = {
dealer.send("".getBytes, ZMQ.SNDMORE)
dealer.send("READY".getBytes, 0)
val reply = new String(dealer.recv(0))
println(s"DEALER: ${new String(dealer.getIdentity)} received $reply")
if(reply == Ack) {println("DEALER: received ACK!"); runHelper(dealer, Count)}
else initiate(dealer)
}
@tailrec
private def runHelper(dealer: Socket, count: Int): Unit = {
val msg = if(count <= 0 ) End.getBytes else message(count)
dealer.send(msg, 0)
val id = new String(dealer.getIdentity)
println(s"DEALER ${id} sent message: ${new String(msg)}.")
// println(s"Dealer: ${dealer.getIdentity} received message: " + new String(dealer.recv(0)))
runHelper(dealer, count - 1)
}
}
object AsyncClientServer {
val End = "END"
val Ack = "WORLD"
val Port = 5555
val ClientReady = "READY"
val Empty = "".getBytes
val context = ZMQ.context(1)
val router = context.socket(ZMQ.ROUTER)
def main(args: Array[String]): Unit = {
router.bind(s"tcp://*:$Port")
new Thread(new Client("JOE")).start()
//new Thread(new Client("JILL")).start()
mainHelper(List.empty)
}
private def mainHelper(activeClients: List[String]): Unit = {
val identity = new String( router.recv(0) )
println(s"ROUTER: Received message from $identity.")
val empty = router.recv(0)
println("ROUTER: received empty: " + new String(empty))
val message = new String( router.recv(0) )
println(s"ROUTER: received message: $message")
checkMessage(identity, message, activeClients) match {
case Normal(msg) => mainHelper(activeClients)
case Ready(id) => ackDealer(router, id); mainHelper(id :: activeClients)
case Kill => sys.exit(0)
case UnknownIdentity => mainHelper(activeClients)
}
}
private def ackDealer(router: Socket, identity: String): Unit = {
router.send(identity.getBytes, ZMQ.SNDMORE)
router.send(Empty, ZMQ.SNDMORE)
router.send(Ack.getBytes, 0)
}
private def checkMessage(identity: String, message: String, activeClients: List[String]): Message = {
if(message == ClientReady) Ready(identity)
else {
activeClients.find(_ == identity) match {
case Some(_) =>
if (message == End) Kill
else Normal(message)
case None => UnknownIdentity
}
}
}
sealed trait Message
case class Normal(value: String) extends Message
case class Ready(id: String) extends Message
case object Kill extends Message
case object UnknownIdentity extends Message
}
但是,ROUTER
接收消息的顺序似乎有问题:
[info] Running net.async.AsyncClientServer
[info] ROUTER: Received message from JOE.
[info] ROUTER: received empty:
[info] ROUTER: received message: READY
[info] DEALER: JOE received
[info] DEALER: JOE received WORLD
[info] DEALER: received ACK!
[info] ROUTER: Received message from JOE.
[info] ROUTER: received empty:
[info] ROUTER: received message: READY
[info] DEALER JOE sent message: HELLO_#5.
[info] DEALER JOE sent message: HELLO_#4.
[info] DEALER JOE sent message: HELLO_#3.
[info] DEALER JOE sent message: HELLO_#2.
[info] ROUTER: Received message from JOE.
[info] DEALER JOE sent message: HELLO_#1.
[info] ROUTER: received empty: HELLO_#5
最后一个 [info]
表明 ROUTER
收到了 HELLO_#5
应该是空的。这是为什么?
在 initiate
中,您正确地发送了一个空帧,然后是内容
dealer.send("".getBytes, ZMQ.SNDMORE)
dealer.send("READY".getBytes, 0)
但是在runHelper
中你只发送内容
dealer.send(msg, 0)
作为 ROUTER to DEALER 的后续行动,我有一个 DEALER
正在与一个 ROUTER
:
简而言之,我为每个 DEALER
创建了 2 个线程。 DEALER
的生命周期是:
- 发送
READY
到ROUTER
表明ROUTER
可以信任来自它的KILL
信号 - 等待来自
ROUTER
的 ACK
- 发送 5 条消息
发送
END
消息以便ROUTER
退出套餐net.async
import org.zeromq.ZMQ import org.zeromq.ZMQ.Socket import scala.annotation.tailrec object Client { val Empty = "".getBytes def message(x: Int) = s"HELLO_#$x".getBytes val Count = 5 } class Client(name: String) extends Runnable { import Client._ import AsyncClientServer._ override def run(): Unit = { val context = ZMQ.context(1) val dealer = context.socket(ZMQ.DEALER) dealer.setIdentity(name.getBytes) dealer.connect(s"tcp://localhost:$Port") initiate(dealer) } private def initiate(dealer: Socket): Unit = { dealer.send("".getBytes, ZMQ.SNDMORE) dealer.send("READY".getBytes, 0) val reply = new String(dealer.recv(0)) println(s"DEALER: ${new String(dealer.getIdentity)} received $reply") if(reply == Ack) {println("DEALER: received ACK!"); runHelper(dealer, Count)} else initiate(dealer) } @tailrec private def runHelper(dealer: Socket, count: Int): Unit = { val msg = if(count <= 0 ) End.getBytes else message(count) dealer.send(msg, 0) val id = new String(dealer.getIdentity) println(s"DEALER ${id} sent message: ${new String(msg)}.") // println(s"Dealer: ${dealer.getIdentity} received message: " + new String(dealer.recv(0))) runHelper(dealer, count - 1) } } object AsyncClientServer { val End = "END" val Ack = "WORLD" val Port = 5555 val ClientReady = "READY" val Empty = "".getBytes val context = ZMQ.context(1) val router = context.socket(ZMQ.ROUTER) def main(args: Array[String]): Unit = { router.bind(s"tcp://*:$Port") new Thread(new Client("JOE")).start() //new Thread(new Client("JILL")).start() mainHelper(List.empty) } private def mainHelper(activeClients: List[String]): Unit = { val identity = new String( router.recv(0) ) println(s"ROUTER: Received message from $identity.") val empty = router.recv(0) println("ROUTER: received empty: " + new String(empty)) val message = new String( router.recv(0) ) println(s"ROUTER: received message: $message") checkMessage(identity, message, activeClients) match { case Normal(msg) => mainHelper(activeClients) case Ready(id) => ackDealer(router, id); mainHelper(id :: activeClients) case Kill => sys.exit(0) case UnknownIdentity => mainHelper(activeClients) } } private def ackDealer(router: Socket, identity: String): Unit = { router.send(identity.getBytes, ZMQ.SNDMORE) router.send(Empty, ZMQ.SNDMORE) router.send(Ack.getBytes, 0) } private def checkMessage(identity: String, message: String, activeClients: List[String]): Message = { if(message == ClientReady) Ready(identity) else { activeClients.find(_ == identity) match { case Some(_) => if (message == End) Kill else Normal(message) case None => UnknownIdentity } } } sealed trait Message case class Normal(value: String) extends Message case class Ready(id: String) extends Message case object Kill extends Message case object UnknownIdentity extends Message }
但是,ROUTER
接收消息的顺序似乎有问题:
[info] Running net.async.AsyncClientServer
[info] ROUTER: Received message from JOE.
[info] ROUTER: received empty:
[info] ROUTER: received message: READY
[info] DEALER: JOE received
[info] DEALER: JOE received WORLD
[info] DEALER: received ACK!
[info] ROUTER: Received message from JOE.
[info] ROUTER: received empty:
[info] ROUTER: received message: READY
[info] DEALER JOE sent message: HELLO_#5.
[info] DEALER JOE sent message: HELLO_#4.
[info] DEALER JOE sent message: HELLO_#3.
[info] DEALER JOE sent message: HELLO_#2.
[info] ROUTER: Received message from JOE.
[info] DEALER JOE sent message: HELLO_#1.
[info] ROUTER: received empty: HELLO_#5
最后一个 [info]
表明 ROUTER
收到了 HELLO_#5
应该是空的。这是为什么?
在 initiate
中,您正确地发送了一个空帧,然后是内容
dealer.send("".getBytes, ZMQ.SNDMORE)
dealer.send("READY".getBytes, 0)
但是在runHelper
中你只发送内容
dealer.send(msg, 0)