使用 Queue() 向特定线程发送命令

Using Queue() to send commands to specific threads

我想将消息发送到特定线程,这些线程充当服务的多个实例的控制器。这是每个线程将用于检查适用于它的消息的代码片段。评论应该解释它做得很好的地方(如果我的评论不好,请告诉我应该怎么写!)

这不是错误修复问题 - 我是线程处理的新手,想知道是否有更好的方法来执行此操作。我觉得有点乱。

def check_queue(self):
      """Gets a simple lock on the command queue, checks its size, then
      iterates through jobs, looking for one that matches its uuid.
      If there is a match, break the out
      If not, put the job back on the queue, set job equal to None, and try again
  """
  with simple_lock:
      for i in range(cmd_q.qsize()):
          self.job = cmd_q.get()
          if job[0] == self.uuid
                break
          cmd_q.push(self.job)
          self.job = None
def command_check(self)
    while not self.job:
        sleep(.5) #wait for things to happen
        self.checkQueue()
    if self.job == 'kill':
        die_gracefully()
     ...etc
#example of a what commands are being passed
def kill_service(uuid):
    job = [uuid, 1]
    cmd_queue.put(job)
    #handle clean up

任何 help/comments/critiques 将不胜感激。

编辑:simple_lock 就是 threading.Lock()。我用它来确保对 cmd_q.qsize() 的调用是准确的——否则,另一个进程可能会在调用 qsize 和完成作业之间添加到队列中。

你真的应该让每个线程都有自己独特的队列:创建一个从 uuid 映射到与该 uuid 一起的 Queue 的字典。生产者可以使用该字典来确定要推送到哪个队列,然后每个消费者都可以使用 q.get() 而无需锁定,如果它的 uuid 错误可能会重新排队等

我采用了您的示例代码并对其进行了一些扩展以演示它的外观:

from Queue import Queue
import threading

class MyThread(threading.Thread):
    cmd_queues = {}  # Map of uuid -> Queue for all instances of MyThread

    def __init__(self, uuid):
        super(threading.Thread, self).__init__()
        self.cmd_q = Queue()
        self.uuid = uuid
        MyThread.cmd_queues[uuid] = self.cmd_q

    def run(self):
        while True:
            self.command_check()

    def command_check(self):
        job = self.cmd_q.get()
        if job == 'kill':
            self.die_gracefully()
         #...etc

    def die_gracefully(self):
       # stuff

def kill_service(uuid):
    job = 1
    MyThread.cmd_queues[uuid].put(job)
    #handle clean up

if __name__ == "__main__":
    for uuid in ['123', '234', '345']:
        t = MyThread(uuid)
        t.start()

    kill_service('123')

在 Python 2.7 中,@dano 的回答(我认为这很棒)引发了运行时错误 - RuntimeError: thread.__init__() not called。我相信可以通过在调用 super.

时将 threading.Thread 替换为基础 class 名称来解决该异常。
from Queue import Queue
import threading

class MyThread(threading.Thread):
    cmd_queues = {}  # Map of uuid -> Queue for all instances of MyThread

    def __init__(self, uuid):
        super(MyThread, self).__init__()
        self.cmd_q = Queue()
        self.uuid = uuid
        MyThread.cmd_queues[uuid] = self.cmd_q

    def run(self):
        while True:
            self.command_check()

    def command_check(self):
        job = self.cmd_q.get()
        if job == 'kill':
            self.die_gracefully()
         #...etc

    def die_gracefully(self):
       # stuff
       pass

def kill_service(uuid):
    job = 1
    MyThread.cmd_queues[uuid].put(job)
    #handle clean up

if __name__ == "__main__":
    for uuid in ['123', '234', '345']:
        t = MyThread(uuid)
        t.start()

    kill_service('123')

如果这不是正确的解决方法,我们将不胜感激。