我可以 运行 清理 python 中守护线程中的代码吗?

Can I run cleanup code in daemon threads in python?

假设我有一些消费者守护线程,只要主线程将对象放在那里并对其执行一些长时间的操作(几秒钟),它们就会不断地从队列中取出对象。

问题是每当主线程完成时,守护线程在完成处理队列中剩余的任何内容之前就被杀死。

我知道解决这个问题的一种方法是等待守护线程完成处理队列中剩余的任何内容然后退出,但我很好奇守护线程是否有任何方法可以“清理”当主线程退出时,在他们自己之后“完成处理队列中剩余的任何内容”,而不是明确地让主线程告诉守护线程开始清理。

这背后的动机是我制作了一个 python 包,它有一个日志记录处理程序 class 每当用户尝试记录某些内容时(例如 logging.info("message")),并且处理程序有一个守护线程,它通过网络发送日志。我更希望守护线程可以自行清理,这样包的用户就不必手动确保让他们的主线程等待日志处理程序完成其处理。

最小工作示例

# this code is in my package
class MyHandler(logging.Handler):
  def __init__(self, level):
    super().__init__(level=level)
    self.queue = Queue()
    self.thread = Thread(target=self.consume, daemon=True)
    self.thread.start()

  def emit(self, record):
    # This gets called whenever the user does logging.info, or similar
    self.queue.put(record)

  def consume(self):
    while True:
      record = self.queue.get()
      send(record) # send record over network, can take a few seconds (assume it never raises)
      self.queue.task_done()
# This is user's main code

# user will have to keep a reference to the handler for later. I want to avoid this.
my_handler = MyHandler()
# set up logging
logging.basicConfig(..., handlers=[..., my_handler])

# do some stuff...
logging.info("this will be sent over network")
# some more stuff...
logging.error("also sent over network")
# even more stuff

# before exiting must wait for handler to finish sending
# I don't want user to have to do this
my_hanler.queue.join()

您可以使用 threading.main_thread.join(),它会像这样等到关机:

import threading
import logging
import queue

class MyHandler(logging.Handler):
  def __init__(self, level):
    super().__init__(level=level)
    self.queue = queue.Queue()
    self.thread = threading.Thread(target=self.consume)  # Not daemon

    # Shutdown thread
    threading.Thread(
        target=lambda: threading.main_thread().join() or self.queue.put(None)
        ).start()
        
    self.thread.start()

  def emit(self, record):
    # This gets called whenever the user does logging.info, or similar
    self.queue.put(record)

  def consume(self):
    while True:
      record = self.queue.get()
      if record is None:
          print("cleaning")
          return  # Cleanup
      print(record) # send record over network, can take a few seconds (assume it never raises)
      self.queue.task_done()

快速测试代码:

logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(MyHandler(logging.INFO))
logging.info("Hello")
exit()

您可以使用atexit等待守护线程关闭:

import queue, threading, time, logging, atexit

class MyHandler(logging.Handler):
  def __init__(self, level):
    super().__init__(level=level)
    self.queue = queue.Queue()
    self.thread = threading.Thread(target=self.consume, daemon=True)

    # Right before main thread exits, signal cleanup and wait until done
    atexit.register(lambda: self.queue.put(None) or self.thread.join())

    self.thread.start()

  def emit(self, record):
    # This gets called whenever the user does logging.info, or similar
    self.queue.put(record)

  def consume(self):
    while True:
      record = self.queue.get()
      if record is None:  # Cleanup requested
          print("cleaning")
          time.sleep(5)
          return
      print(record) # send record over network, can take a few seconds (assume it never raises)
      self.queue.task_done()

# Test code
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(MyHandler(logging.INFO))
logging.info("Hello")