在多线程程序中通过队列发送数据的最佳机制是什么?

What is the best mechanism to send data through queues in a multithreaded program?

我正在制作一个多线程应用程序,其中主进程通过队列将消息发送到适当的线程。我的疑问在于线程的一部分:我发现的解决方案不断地倾听(达到极限,这就是为什么我有我的 class Clock 及其方法“isRunnig”,如果 returns True时间还没有过期)并且如果尝试没有数据到达,那么我会捕获异常并继续。

我先把主流程的代码简化一下:

    def callUpdate (self, update : Update): #Update is a class that includes the correct ID of its thread and the data to process by the thread.
        find = False
        wrapp : _Wrapper = None
        for current in self.threads:
            if (type(current) is not _Wrapper): #_Wrapper is a class that includes the thread
                continue
            if not current.theThread.is_alive() :
                #Here I save some data, and I remove the thread from
                self.threads.remove(current) 
                continue

            if (current.id == update.id):
                wrapp = current
                find = True
                break
        #Here I do some things and then, I create a new thread if not found and send first message (the update itself in this first send), or if its found and working (alive), I just send the data to the thread. Wrapper creates a new queue and saves the thread to send more data later if needed.
        if (not find):
            wrapp = _Wrapper(data)
            self.threads.append(wrapp)        
            wrapp.queue.put(update)
            bot.start()
        else:
            #Thread already working and I send the update
            wrapp.queue.put(update)

好吧,现在我把线程部分简化了,这让我很担心,因为它看起来有点“草率”。请注意,我读取消息队列时停顿了 1 秒。 我有一个时钟 class 如果指示的时间已经过去(在本例中为 120 秒)

,它只是 returns
def process (self): #This process is part of the class that heritate from Thread (class ProcessThread (threading.Thread):
  clock = Clock(seconds=120)
  while (clock.isRunning()):
            update: Update = self.getUpdateFromQueue(seconds=1)
            if (update is None) : continue
            #At this point, the message update is correct and I process the data. Once the clock is finnish, I finnish the process
  return

问题是有时候程序执行的很慢,线程少或者线程多(好像跟它没有关系);我也尝试过减少队列的重读时间(因为如果有很多请求似乎会导致问题)。 我觉得它很老套,任何人都可以建议我在多线程中接收排队数据的任何其他选项吗?

谢谢

------------ 编辑---------- 抱歉,我没有包括从队列中获取数据的过程:

    #Get data from queue, maximum wait time in seconds.
    def getUpdateFromQueue (self, seconds=10):
        max = datetime.datetime.now() + datetime.timedelta(seconds=seconds)
        current = datetime.datetime.now()
        while (current < max):
            try:
                data : Update = self.queue.get(timeout=0.01)                
                return data
            except Empty:
                current = datetime.datetime.now()
                continue
        return None

您的代码无缘无故地旋转和等待,这自然会损害性能;你根本不应该在你自己的代码中这样做。而是使用 queue.Queue 中的超时功能来处理您的超时。

例如,getUpdateFromQueue 不需要循环查看 short-timed 调用 queue.get 之间的时间;它可以将 seconds 最大值直接传递给 queue.get:

def getUpdateFromQueue(self, seconds=10):
    try:
        return self.queue.get(timeout=seconds)
    except Empty:
        return None

但是您一开始不需要将它作为自己的功能。而不是:

def process(self):
    clock = Clock(seconds=120)
    while (clock.isRunning()):
        update: Update = self.getUpdateFromQueue(seconds=1)
        if (update is None) : continue
    return

您可以直接将 queue.get 与您尝试使用 Clock class:

强制执行的总体最大超时一起使用
def process(self):
    try:
        return self.queue.get(timeout=120)
    except Empty:
        return None

那应该是一样的效果(return一条数据,最多等待120秒才returning None代替),不用两层嵌套while 不断旋转 CPU 的循环(两者做同样的事情,只是分辨率不同)。

如果您需要处理多条消息,您只需要一个循环,您可以在其中调整每个 get() 的超时以反映总体截止日期。 (我在这里使用 time.monotonic() 是因为根据定义它不会被系统时钟的变化所抛弃。)

from queue import Empty
from time import monotonic


def process(self, data):
    # do whatever you need to do with one piece of data
    pass

def process_messages_with_timeout(self, timeout=120):
    deadline = monotonic() + timeout
    while True:
        try:
            self.process(self.queue.get(timeout=deadline - monotonic()))
        except Empty:
            break

重要的是,对于您真正想要获得的每个项目,您只需要调用一次 get(),并设置实际超时时间;使用比您想要的更短的超时时间执行 get() 然后添加额外的逻辑以在实际超时时间内重试是没有意义的。在循环中添加额外的循环没有任何意义。