在多线程程序中通过队列发送数据的最佳机制是什么?
What is the best mechanism to send data through queues in a multithreaded program?
我正在制作一个多线程应用程序,其中主进程通过队列将消息发送到适当的线程。我的疑问在于线程的一部分:我发现的解决方案不断地倾听(达到极限,这就是为什么我有我的 class Clock 及其方法“isRunnig”,如果 returns True时间还没有过期)并且如果尝试没有数据到达,那么我会捕获异常并继续。
我先把主流程的代码简化一下:
def callUpdate (self, update : Update): #Update is a class that includes the correct ID of its thread and the data to process by the thread.
find = False
wrapp : _Wrapper = None
for current in self.threads:
if (type(current) is not _Wrapper): #_Wrapper is a class that includes the thread
continue
if not current.theThread.is_alive() :
#Here I save some data, and I remove the thread from
self.threads.remove(current)
continue
if (current.id == update.id):
wrapp = current
find = True
break
#Here I do some things and then, I create a new thread if not found and send first message (the update itself in this first send), or if its found and working (alive), I just send the data to the thread. Wrapper creates a new queue and saves the thread to send more data later if needed.
if (not find):
wrapp = _Wrapper(data)
self.threads.append(wrapp)
wrapp.queue.put(update)
bot.start()
else:
#Thread already working and I send the update
wrapp.queue.put(update)
好吧,现在我把线程部分简化了,这让我很担心,因为它看起来有点“草率”。请注意,我读取消息队列时停顿了 1 秒。
我有一个时钟 class 如果指示的时间已经过去(在本例中为 120 秒)
,它只是 returns
def process (self): #This process is part of the class that heritate from Thread (class ProcessThread (threading.Thread):
clock = Clock(seconds=120)
while (clock.isRunning()):
update: Update = self.getUpdateFromQueue(seconds=1)
if (update is None) : continue
#At this point, the message update is correct and I process the data. Once the clock is finnish, I finnish the process
return
问题是有时候程序执行的很慢,线程少或者线程多(好像跟它没有关系);我也尝试过减少队列的重读时间(因为如果有很多请求似乎会导致问题)。
我觉得它很老套,任何人都可以建议我在多线程中接收排队数据的任何其他选项吗?
谢谢
------------ 编辑----------
抱歉,我没有包括从队列中获取数据的过程:
#Get data from queue, maximum wait time in seconds.
def getUpdateFromQueue (self, seconds=10):
max = datetime.datetime.now() + datetime.timedelta(seconds=seconds)
current = datetime.datetime.now()
while (current < max):
try:
data : Update = self.queue.get(timeout=0.01)
return data
except Empty:
current = datetime.datetime.now()
continue
return None
您的代码无缘无故地旋转和等待,这自然会损害性能;你根本不应该在你自己的代码中这样做。而是使用 queue.Queue
中的超时功能来处理您的超时。
例如,getUpdateFromQueue
不需要循环查看 short-timed 调用 queue.get
之间的时间;它可以将 seconds
最大值直接传递给 queue.get
:
def getUpdateFromQueue(self, seconds=10):
try:
return self.queue.get(timeout=seconds)
except Empty:
return None
但是您一开始不需要将它作为自己的功能。而不是:
def process(self):
clock = Clock(seconds=120)
while (clock.isRunning()):
update: Update = self.getUpdateFromQueue(seconds=1)
if (update is None) : continue
return
您可以直接将 queue.get
与您尝试使用 Clock
class:
强制执行的总体最大超时一起使用
def process(self):
try:
return self.queue.get(timeout=120)
except Empty:
return None
那应该是一样的效果(return一条数据,最多等待120秒才returning None
代替),不用两层嵌套while
不断旋转 CPU 的循环(两者做同样的事情,只是分辨率不同)。
如果您需要处理多条消息,您只需要一个循环,您可以在其中调整每个 get()
的超时以反映总体截止日期。 (我在这里使用 time.monotonic()
是因为根据定义它不会被系统时钟的变化所抛弃。)
from queue import Empty
from time import monotonic
def process(self, data):
# do whatever you need to do with one piece of data
pass
def process_messages_with_timeout(self, timeout=120):
deadline = monotonic() + timeout
while True:
try:
self.process(self.queue.get(timeout=deadline - monotonic()))
except Empty:
break
重要的是,对于您真正想要获得的每个项目,您只需要调用一次 get()
,并设置实际超时时间;使用比您想要的更短的超时时间执行 get()
然后添加额外的逻辑以在实际超时时间内重试是没有意义的。在循环中添加额外的循环没有任何意义。
我正在制作一个多线程应用程序,其中主进程通过队列将消息发送到适当的线程。我的疑问在于线程的一部分:我发现的解决方案不断地倾听(达到极限,这就是为什么我有我的 class Clock 及其方法“isRunnig”,如果 returns True时间还没有过期)并且如果尝试没有数据到达,那么我会捕获异常并继续。
我先把主流程的代码简化一下:
def callUpdate (self, update : Update): #Update is a class that includes the correct ID of its thread and the data to process by the thread.
find = False
wrapp : _Wrapper = None
for current in self.threads:
if (type(current) is not _Wrapper): #_Wrapper is a class that includes the thread
continue
if not current.theThread.is_alive() :
#Here I save some data, and I remove the thread from
self.threads.remove(current)
continue
if (current.id == update.id):
wrapp = current
find = True
break
#Here I do some things and then, I create a new thread if not found and send first message (the update itself in this first send), or if its found and working (alive), I just send the data to the thread. Wrapper creates a new queue and saves the thread to send more data later if needed.
if (not find):
wrapp = _Wrapper(data)
self.threads.append(wrapp)
wrapp.queue.put(update)
bot.start()
else:
#Thread already working and I send the update
wrapp.queue.put(update)
好吧,现在我把线程部分简化了,这让我很担心,因为它看起来有点“草率”。请注意,我读取消息队列时停顿了 1 秒。 我有一个时钟 class 如果指示的时间已经过去(在本例中为 120 秒)
,它只是 returnsdef process (self): #This process is part of the class that heritate from Thread (class ProcessThread (threading.Thread):
clock = Clock(seconds=120)
while (clock.isRunning()):
update: Update = self.getUpdateFromQueue(seconds=1)
if (update is None) : continue
#At this point, the message update is correct and I process the data. Once the clock is finnish, I finnish the process
return
问题是有时候程序执行的很慢,线程少或者线程多(好像跟它没有关系);我也尝试过减少队列的重读时间(因为如果有很多请求似乎会导致问题)。 我觉得它很老套,任何人都可以建议我在多线程中接收排队数据的任何其他选项吗?
谢谢
------------ 编辑---------- 抱歉,我没有包括从队列中获取数据的过程:
#Get data from queue, maximum wait time in seconds.
def getUpdateFromQueue (self, seconds=10):
max = datetime.datetime.now() + datetime.timedelta(seconds=seconds)
current = datetime.datetime.now()
while (current < max):
try:
data : Update = self.queue.get(timeout=0.01)
return data
except Empty:
current = datetime.datetime.now()
continue
return None
您的代码无缘无故地旋转和等待,这自然会损害性能;你根本不应该在你自己的代码中这样做。而是使用 queue.Queue
中的超时功能来处理您的超时。
例如,getUpdateFromQueue
不需要循环查看 short-timed 调用 queue.get
之间的时间;它可以将 seconds
最大值直接传递给 queue.get
:
def getUpdateFromQueue(self, seconds=10):
try:
return self.queue.get(timeout=seconds)
except Empty:
return None
但是您一开始不需要将它作为自己的功能。而不是:
def process(self):
clock = Clock(seconds=120)
while (clock.isRunning()):
update: Update = self.getUpdateFromQueue(seconds=1)
if (update is None) : continue
return
您可以直接将 queue.get
与您尝试使用 Clock
class:
def process(self):
try:
return self.queue.get(timeout=120)
except Empty:
return None
那应该是一样的效果(return一条数据,最多等待120秒才returning None
代替),不用两层嵌套while
不断旋转 CPU 的循环(两者做同样的事情,只是分辨率不同)。
如果您需要处理多条消息,您只需要一个循环,您可以在其中调整每个 get()
的超时以反映总体截止日期。 (我在这里使用 time.monotonic()
是因为根据定义它不会被系统时钟的变化所抛弃。)
from queue import Empty
from time import monotonic
def process(self, data):
# do whatever you need to do with one piece of data
pass
def process_messages_with_timeout(self, timeout=120):
deadline = monotonic() + timeout
while True:
try:
self.process(self.queue.get(timeout=deadline - monotonic()))
except Empty:
break
重要的是,对于您真正想要获得的每个项目,您只需要调用一次 get()
,并设置实际超时时间;使用比您想要的更短的超时时间执行 get()
然后添加额外的逻辑以在实际超时时间内重试是没有意义的。在循环中添加额外的循环没有任何意义。