Runnable 中的 JeroMQ 订阅者

JeroMQ Subscriber in a Runnable

我正在尝试将 ZMQ 订阅者嵌入到 Runnable 中。 我能够第一次启动 Runnable,一切似乎都很好。 问题是当我中断线程并尝试启动一个新线程时,订阅者没有收到任何消息。例如:

  1. 我有一个可运行的发布者

    class ZMQPublisherRunnable() extends Runnable {
    
     override def run() {
       val ZMQcontext = ZMQ.context(1)
       val publisher = ZMQcontext.socket(ZMQ.PUB)
       var count = 0
    
       publisher.connect(s"tcp://127.0.0.1:16666")
    
       while (!Thread.currentThread().isInterrupted) {
         try {
           println(s"PUBLISHER -> $count")
           publisher.send(s"PUBLISHER -> $count")
           count += 1
           Thread.sleep(1000)
         }
         catch {
           case e: Exception =>
           println(e.getMessage)
           publisher.disconnect(s"tcp://127.0.0.1:16666")
           ZMQcontext.close()
         }
       }
     }
    }
    
  2. 我有一个 Subscriber Runnable:

    class ZMQSubscriberRunnable1() extends Runnable {
    
      override def run() {
    
        println("STARTING SUBSCRIBER")
    
        val ZMQcontext = ZMQ.context(1)
        val subscriber = ZMQcontext.socket(ZMQ.SUB)
        subscriber.subscribe("".getBytes)
    
       subscriber.bind(s"tcp://127.0.0.1:16666")
    
        while (!Thread.currentThread().isInterrupted) {
          try {
            println("waiting")
            val mesg = new String(subscriber.recv(0))
            println(s"SUBSCRIBER -> $mesg")
          }
          catch {
            case e: Exception =>
              println(e.getMessage)
              subscriber.unbind("tcp://127.0.0.1:16666")
              subscriber.close()
              ZMQcontext.close()
          }
        }
      }
    }
    
  3. 我的主要代码如下所示:

    object Application extends App {
      val zmqPUB = new ZMQPublisherRunnable
      val zmqThreadPUB = new Thread(zmqPUB, "MY_PUB")
    
      zmqThreadPUB.setDaemon(true)
      zmqThreadPUB.start()
    
      val zmqRunnable = new ZMQSubscriberRunnable1
      val zmqThread = new Thread(zmqRunnable, "MY_TEST")
    
      zmqThread.setDaemon(true)
      zmqThread.start()
    
      Thread.sleep(10000)
    
      zmqThread.interrupt()
      zmqThread.join()
    
      Thread.sleep(2000)
    
      val zmqRunnable_2 = new ZMQSubscriberRunnable1
      val zmqThread_2 = new Thread(zmqRunnable_2, "MY_TEST_2")
    
      zmqThread_2.setDaemon(true)
      zmqThread_2.start()
    
      Thread.sleep(10000)
    
      zmqThread_2.interrupt()
      zmqThread_2.join()
    }
    

我第一次启动订阅者时,我能够收到所有消息:

STARTING SUBSCRIBER
PUBLISHER -> 0
waiting
PUBLISHER -> 1
SUBSCRIBER -> PUBLISHER -> 1
waiting
PUBLISHER -> 2
SUBSCRIBER -> PUBLISHER -> 2
waiting
PUBLISHER -> 3
SUBSCRIBER -> PUBLISHER -> 3
waiting
...

一旦我中断线程并从同一个 Runnable 启动一个新线程,我就无法再阅读消息。永远等待

STARTING SUBSCRIBER
waiting
PUBLISHER -> 13
PUBLISHER -> 14
PUBLISHER -> 15
PUBLISHER -> 16
PUBLISHER -> 17
...

对我做错了什么有任何见解吗?

谢谢

JeroMQ Thread.interrupt 不安全。

要解决此问题,您必须在调用 Thread.interrupt

之前停止 ZMQContext
  1. Runnable
  2. 之外实例化ZMQContext
  3. ZMQContext作为参数传递给ZMQ Runnable(您也可以将其用作全局变量)
  4. 致电zmqContext.term()
  5. 致电zmqSubThread.interrupt()
  6. 致电zmqSubThread.join()

有关详细信息,请查看:https://github.com/zeromq/jeromq/issues/116

我的订阅者 Runnable 看起来像:

class ZMQSubscriberRunnable(zmqContext:ZMQ.Context, port: Int, ip: String, topic: String) extends Runnable {

  override def run() {

    var contextTerminated = false
    val subscriber = zmqContext.socket(ZMQ.SUB)
    subscriber.subscribe(topic.getBytes)

    subscriber.bind(s"tcp://$ip:$port")

    while (!contextTerminated && !Thread.currentThread().isInterrupted) {
      try {
        println(new String(subscriber.recv(0)))
      }
      catch {
        case e: ZMQException if e.getErrorCode == ZMQ.Error.ETERM.getCode =>
          contextTerminated = true
          subscriber.close()
        case e: Exception =>
          zmqContext.term()
          subscriber.close()
      }
    }
  }
}

要中断线程:

zmqContext.term()
zmqSubThread.interrupt()
zmqSubThread.join()