使用 Queue() 向特定线程发送命令
Using Queue() to send commands to specific threads
我想将消息发送到特定线程,这些线程充当服务的多个实例的控制器。这是每个线程将用于检查适用于它的消息的代码片段。评论应该解释它做得很好的地方(如果我的评论不好,请告诉我应该怎么写!)
这不是错误修复问题 - 我是线程处理的新手,想知道是否有更好的方法来执行此操作。我觉得有点乱。
def check_queue(self):
"""Gets a simple lock on the command queue, checks its size, then
iterates through jobs, looking for one that matches its uuid.
If there is a match, break the out
If not, put the job back on the queue, set job equal to None, and try again
"""
with simple_lock:
for i in range(cmd_q.qsize()):
self.job = cmd_q.get()
if job[0] == self.uuid
break
cmd_q.push(self.job)
self.job = None
def command_check(self)
while not self.job:
sleep(.5) #wait for things to happen
self.checkQueue()
if self.job == 'kill':
die_gracefully()
...etc
#example of a what commands are being passed
def kill_service(uuid):
job = [uuid, 1]
cmd_queue.put(job)
#handle clean up
任何 help/comments/critiques 将不胜感激。
编辑:simple_lock 就是 threading.Lock()。我用它来确保对 cmd_q.qsize() 的调用是准确的——否则,另一个进程可能会在调用 qsize 和完成作业之间添加到队列中。
你真的应该让每个线程都有自己独特的队列:创建一个从 uuid 映射到与该 uuid 一起的 Queue
的字典。生产者可以使用该字典来确定要推送到哪个队列,然后每个消费者都可以使用 q.get()
而无需锁定,如果它的 uuid 错误可能会重新排队等
我采用了您的示例代码并对其进行了一些扩展以演示它的外观:
from Queue import Queue
import threading
class MyThread(threading.Thread):
cmd_queues = {} # Map of uuid -> Queue for all instances of MyThread
def __init__(self, uuid):
super(threading.Thread, self).__init__()
self.cmd_q = Queue()
self.uuid = uuid
MyThread.cmd_queues[uuid] = self.cmd_q
def run(self):
while True:
self.command_check()
def command_check(self):
job = self.cmd_q.get()
if job == 'kill':
self.die_gracefully()
#...etc
def die_gracefully(self):
# stuff
def kill_service(uuid):
job = 1
MyThread.cmd_queues[uuid].put(job)
#handle clean up
if __name__ == "__main__":
for uuid in ['123', '234', '345']:
t = MyThread(uuid)
t.start()
kill_service('123')
在 Python 2.7 中,@dano 的回答(我认为这很棒)引发了运行时错误 - RuntimeError: thread.__init__() not called
。我相信可以通过在调用 super.
时将 threading.Thread
替换为基础 class 名称来解决该异常。
from Queue import Queue
import threading
class MyThread(threading.Thread):
cmd_queues = {} # Map of uuid -> Queue for all instances of MyThread
def __init__(self, uuid):
super(MyThread, self).__init__()
self.cmd_q = Queue()
self.uuid = uuid
MyThread.cmd_queues[uuid] = self.cmd_q
def run(self):
while True:
self.command_check()
def command_check(self):
job = self.cmd_q.get()
if job == 'kill':
self.die_gracefully()
#...etc
def die_gracefully(self):
# stuff
pass
def kill_service(uuid):
job = 1
MyThread.cmd_queues[uuid].put(job)
#handle clean up
if __name__ == "__main__":
for uuid in ['123', '234', '345']:
t = MyThread(uuid)
t.start()
kill_service('123')
如果这不是正确的解决方法,我们将不胜感激。
我想将消息发送到特定线程,这些线程充当服务的多个实例的控制器。这是每个线程将用于检查适用于它的消息的代码片段。评论应该解释它做得很好的地方(如果我的评论不好,请告诉我应该怎么写!)
这不是错误修复问题 - 我是线程处理的新手,想知道是否有更好的方法来执行此操作。我觉得有点乱。
def check_queue(self):
"""Gets a simple lock on the command queue, checks its size, then
iterates through jobs, looking for one that matches its uuid.
If there is a match, break the out
If not, put the job back on the queue, set job equal to None, and try again
"""
with simple_lock:
for i in range(cmd_q.qsize()):
self.job = cmd_q.get()
if job[0] == self.uuid
break
cmd_q.push(self.job)
self.job = None
def command_check(self)
while not self.job:
sleep(.5) #wait for things to happen
self.checkQueue()
if self.job == 'kill':
die_gracefully()
...etc
#example of a what commands are being passed
def kill_service(uuid):
job = [uuid, 1]
cmd_queue.put(job)
#handle clean up
任何 help/comments/critiques 将不胜感激。
编辑:simple_lock 就是 threading.Lock()。我用它来确保对 cmd_q.qsize() 的调用是准确的——否则,另一个进程可能会在调用 qsize 和完成作业之间添加到队列中。
你真的应该让每个线程都有自己独特的队列:创建一个从 uuid 映射到与该 uuid 一起的 Queue
的字典。生产者可以使用该字典来确定要推送到哪个队列,然后每个消费者都可以使用 q.get()
而无需锁定,如果它的 uuid 错误可能会重新排队等
我采用了您的示例代码并对其进行了一些扩展以演示它的外观:
from Queue import Queue
import threading
class MyThread(threading.Thread):
cmd_queues = {} # Map of uuid -> Queue for all instances of MyThread
def __init__(self, uuid):
super(threading.Thread, self).__init__()
self.cmd_q = Queue()
self.uuid = uuid
MyThread.cmd_queues[uuid] = self.cmd_q
def run(self):
while True:
self.command_check()
def command_check(self):
job = self.cmd_q.get()
if job == 'kill':
self.die_gracefully()
#...etc
def die_gracefully(self):
# stuff
def kill_service(uuid):
job = 1
MyThread.cmd_queues[uuid].put(job)
#handle clean up
if __name__ == "__main__":
for uuid in ['123', '234', '345']:
t = MyThread(uuid)
t.start()
kill_service('123')
在 Python 2.7 中,@dano 的回答(我认为这很棒)引发了运行时错误 - RuntimeError: thread.__init__() not called
。我相信可以通过在调用 super.
threading.Thread
替换为基础 class 名称来解决该异常。
from Queue import Queue
import threading
class MyThread(threading.Thread):
cmd_queues = {} # Map of uuid -> Queue for all instances of MyThread
def __init__(self, uuid):
super(MyThread, self).__init__()
self.cmd_q = Queue()
self.uuid = uuid
MyThread.cmd_queues[uuid] = self.cmd_q
def run(self):
while True:
self.command_check()
def command_check(self):
job = self.cmd_q.get()
if job == 'kill':
self.die_gracefully()
#...etc
def die_gracefully(self):
# stuff
pass
def kill_service(uuid):
job = 1
MyThread.cmd_queues[uuid].put(job)
#handle clean up
if __name__ == "__main__":
for uuid in ['123', '234', '345']:
t = MyThread(uuid)
t.start()
kill_service('123')
如果这不是正确的解决方法,我们将不胜感激。