Runnable 中的 JeroMQ 订阅者
JeroMQ Subscriber in a Runnable
我正在尝试将 ZMQ 订阅者嵌入到 Runnable 中。
我能够第一次启动 Runnable,一切似乎都很好。
问题是当我中断线程并尝试启动一个新线程时,订阅者没有收到任何消息。例如:
我有一个可运行的发布者
class ZMQPublisherRunnable() extends Runnable {
override def run() {
val ZMQcontext = ZMQ.context(1)
val publisher = ZMQcontext.socket(ZMQ.PUB)
var count = 0
publisher.connect(s"tcp://127.0.0.1:16666")
while (!Thread.currentThread().isInterrupted) {
try {
println(s"PUBLISHER -> $count")
publisher.send(s"PUBLISHER -> $count")
count += 1
Thread.sleep(1000)
}
catch {
case e: Exception =>
println(e.getMessage)
publisher.disconnect(s"tcp://127.0.0.1:16666")
ZMQcontext.close()
}
}
}
}
我有一个 Subscriber Runnable:
class ZMQSubscriberRunnable1() extends Runnable {
override def run() {
println("STARTING SUBSCRIBER")
val ZMQcontext = ZMQ.context(1)
val subscriber = ZMQcontext.socket(ZMQ.SUB)
subscriber.subscribe("".getBytes)
subscriber.bind(s"tcp://127.0.0.1:16666")
while (!Thread.currentThread().isInterrupted) {
try {
println("waiting")
val mesg = new String(subscriber.recv(0))
println(s"SUBSCRIBER -> $mesg")
}
catch {
case e: Exception =>
println(e.getMessage)
subscriber.unbind("tcp://127.0.0.1:16666")
subscriber.close()
ZMQcontext.close()
}
}
}
}
我的主要代码如下所示:
object Application extends App {
val zmqPUB = new ZMQPublisherRunnable
val zmqThreadPUB = new Thread(zmqPUB, "MY_PUB")
zmqThreadPUB.setDaemon(true)
zmqThreadPUB.start()
val zmqRunnable = new ZMQSubscriberRunnable1
val zmqThread = new Thread(zmqRunnable, "MY_TEST")
zmqThread.setDaemon(true)
zmqThread.start()
Thread.sleep(10000)
zmqThread.interrupt()
zmqThread.join()
Thread.sleep(2000)
val zmqRunnable_2 = new ZMQSubscriberRunnable1
val zmqThread_2 = new Thread(zmqRunnable_2, "MY_TEST_2")
zmqThread_2.setDaemon(true)
zmqThread_2.start()
Thread.sleep(10000)
zmqThread_2.interrupt()
zmqThread_2.join()
}
我第一次启动订阅者时,我能够收到所有消息:
STARTING SUBSCRIBER
PUBLISHER -> 0
waiting
PUBLISHER -> 1
SUBSCRIBER -> PUBLISHER -> 1
waiting
PUBLISHER -> 2
SUBSCRIBER -> PUBLISHER -> 2
waiting
PUBLISHER -> 3
SUBSCRIBER -> PUBLISHER -> 3
waiting
...
一旦我中断线程并从同一个 Runnable 启动一个新线程,我就无法再阅读消息。永远等待
STARTING SUBSCRIBER
waiting
PUBLISHER -> 13
PUBLISHER -> 14
PUBLISHER -> 15
PUBLISHER -> 16
PUBLISHER -> 17
...
对我做错了什么有任何见解吗?
谢谢
JeroMQ
Thread.interrupt
不安全。
要解决此问题,您必须在调用 Thread.interrupt
之前停止 ZMQContext
- 在
Runnable
之外实例化ZMQContext
- 将
ZMQContext
作为参数传递给ZMQ Runnable
(您也可以将其用作全局变量)
- 致电
zmqContext.term()
- 致电
zmqSubThread.interrupt()
- 致电
zmqSubThread.join()
有关详细信息,请查看:https://github.com/zeromq/jeromq/issues/116
我的订阅者 Runnable 看起来像:
class ZMQSubscriberRunnable(zmqContext:ZMQ.Context, port: Int, ip: String, topic: String) extends Runnable {
override def run() {
var contextTerminated = false
val subscriber = zmqContext.socket(ZMQ.SUB)
subscriber.subscribe(topic.getBytes)
subscriber.bind(s"tcp://$ip:$port")
while (!contextTerminated && !Thread.currentThread().isInterrupted) {
try {
println(new String(subscriber.recv(0)))
}
catch {
case e: ZMQException if e.getErrorCode == ZMQ.Error.ETERM.getCode =>
contextTerminated = true
subscriber.close()
case e: Exception =>
zmqContext.term()
subscriber.close()
}
}
}
}
要中断线程:
zmqContext.term()
zmqSubThread.interrupt()
zmqSubThread.join()
我正在尝试将 ZMQ 订阅者嵌入到 Runnable 中。 我能够第一次启动 Runnable,一切似乎都很好。 问题是当我中断线程并尝试启动一个新线程时,订阅者没有收到任何消息。例如:
我有一个可运行的发布者
class ZMQPublisherRunnable() extends Runnable { override def run() { val ZMQcontext = ZMQ.context(1) val publisher = ZMQcontext.socket(ZMQ.PUB) var count = 0 publisher.connect(s"tcp://127.0.0.1:16666") while (!Thread.currentThread().isInterrupted) { try { println(s"PUBLISHER -> $count") publisher.send(s"PUBLISHER -> $count") count += 1 Thread.sleep(1000) } catch { case e: Exception => println(e.getMessage) publisher.disconnect(s"tcp://127.0.0.1:16666") ZMQcontext.close() } } } }
我有一个 Subscriber Runnable:
class ZMQSubscriberRunnable1() extends Runnable { override def run() { println("STARTING SUBSCRIBER") val ZMQcontext = ZMQ.context(1) val subscriber = ZMQcontext.socket(ZMQ.SUB) subscriber.subscribe("".getBytes) subscriber.bind(s"tcp://127.0.0.1:16666") while (!Thread.currentThread().isInterrupted) { try { println("waiting") val mesg = new String(subscriber.recv(0)) println(s"SUBSCRIBER -> $mesg") } catch { case e: Exception => println(e.getMessage) subscriber.unbind("tcp://127.0.0.1:16666") subscriber.close() ZMQcontext.close() } } } }
我的主要代码如下所示:
object Application extends App { val zmqPUB = new ZMQPublisherRunnable val zmqThreadPUB = new Thread(zmqPUB, "MY_PUB") zmqThreadPUB.setDaemon(true) zmqThreadPUB.start() val zmqRunnable = new ZMQSubscriberRunnable1 val zmqThread = new Thread(zmqRunnable, "MY_TEST") zmqThread.setDaemon(true) zmqThread.start() Thread.sleep(10000) zmqThread.interrupt() zmqThread.join() Thread.sleep(2000) val zmqRunnable_2 = new ZMQSubscriberRunnable1 val zmqThread_2 = new Thread(zmqRunnable_2, "MY_TEST_2") zmqThread_2.setDaemon(true) zmqThread_2.start() Thread.sleep(10000) zmqThread_2.interrupt() zmqThread_2.join() }
我第一次启动订阅者时,我能够收到所有消息:
STARTING SUBSCRIBER
PUBLISHER -> 0
waiting
PUBLISHER -> 1
SUBSCRIBER -> PUBLISHER -> 1
waiting
PUBLISHER -> 2
SUBSCRIBER -> PUBLISHER -> 2
waiting
PUBLISHER -> 3
SUBSCRIBER -> PUBLISHER -> 3
waiting
...
一旦我中断线程并从同一个 Runnable 启动一个新线程,我就无法再阅读消息。永远等待
STARTING SUBSCRIBER
waiting
PUBLISHER -> 13
PUBLISHER -> 14
PUBLISHER -> 15
PUBLISHER -> 16
PUBLISHER -> 17
...
对我做错了什么有任何见解吗?
谢谢
JeroMQ
Thread.interrupt
不安全。
要解决此问题,您必须在调用 Thread.interrupt
ZMQContext
- 在
Runnable
之外实例化 - 将
ZMQContext
作为参数传递给ZMQ Runnable
(您也可以将其用作全局变量) - 致电
zmqContext.term()
- 致电
zmqSubThread.interrupt()
- 致电
zmqSubThread.join()
ZMQContext
有关详细信息,请查看:https://github.com/zeromq/jeromq/issues/116
我的订阅者 Runnable 看起来像:
class ZMQSubscriberRunnable(zmqContext:ZMQ.Context, port: Int, ip: String, topic: String) extends Runnable {
override def run() {
var contextTerminated = false
val subscriber = zmqContext.socket(ZMQ.SUB)
subscriber.subscribe(topic.getBytes)
subscriber.bind(s"tcp://$ip:$port")
while (!contextTerminated && !Thread.currentThread().isInterrupted) {
try {
println(new String(subscriber.recv(0)))
}
catch {
case e: ZMQException if e.getErrorCode == ZMQ.Error.ETERM.getCode =>
contextTerminated = true
subscriber.close()
case e: Exception =>
zmqContext.term()
subscriber.close()
}
}
}
}
要中断线程:
zmqContext.term()
zmqSubThread.interrupt()
zmqSubThread.join()