线程完成的进程永远不会退出

Process with finished thread never exits

为什么即使在目标完成后,线程仍应坚持并防止其进程退出?

While this question uses an additional child-process, the underlaying issue is entirely rooted in multithreading. Therefore this basic issue can be reproduced with the MainProcess alone. (Edited by @Darkonaut)

我做了一个 class 继承 multiprocessing.Process:

class Task(Process):
    def run(self) :
        print("RUN")

        t = threading.Thread(target=do_some_work)
        t.start()
        # ...
        t.join()
        print("CLOSED")

我是这样开始的:

proc = Task()
proc.start()
proc.join()
print("JOINED")

但它不会加入,输出将是这样的:

>> RUN
>> CLOSED

我没有使用任何类型的 QueuesPipes

当我在 Ubuntu 上 运行 时,我用它的 pid 跟踪了进程。即使在 print("CLOSED") 行无一例外地完成后,进程仍然存在。我还在 Windows 上 运行 并在任务管理器中跟踪了该进程。该进程在 print("CLOSED") 后退出,但仍未加入。

另一点是,在 Ubuntu 上,当在 print("CLOSED") 之后一切都卡住了并且我按下 Ctrl + C 时,我得到这个:

Traceback (most recent call last):
  File "Scheduler.py", line 164, in <module>
    scheduler.start()
  File "Scheduler.py", line 152, in start
    self.enqueueTask(plan)
  File "Scheduler.py", line 134, in enqueueTask
    proc.join()
  File "/usr/local/lib/python3.8/multiprocessing/process.py", line 149, in join
    res = self._popen.wait(timeout)
  File "/usr/local/lib/python3.8/multiprocessing/popen_fork.py", line 47, in wait
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  File "/usr/local/lib/python3.8/multiprocessing/popen_fork.py", line 27, in poll
    pid, sts = os.waitpid(self.pid, flag)

根据最后一行,我猜主进程正在等待什么,但是什么以及为什么?

问题似乎与我在 Taskrun() 方法中启动的非守护线程有关。 将此线程设为守护线程可以解决问题,因此我可以肯定地说此线程正在阻止我的进程关闭,即使在其 MainThread 完成后也是如此。我仍然很困惑,因为那个非守护线程的目标函数已成功完成。

Why should a thread persist and prevent its process to exit, even after its target is done?

虽然这个问题使用了一个额外的子进程,但底层问题完全源于多线程。因此,这个基本问题可以单独使用 MainProcess 重现。可以在 edit 2.

中找到涉及额外子进程的答案

场景

在没有看到您的子进程中的新线程真正在做什么的情况下,您观察到的行为的可能情况是您的 thread-1 正在开始又一个thread-2,你甚至可能没有意识到。可能它是从您正在调用的第三方库启动的,或者留在 stdlib 中,multiprocessing.Queue.put() 还在后台启动一个馈线线程。

这种一般情况不是 Process 子类化问题,也不与从子进程本身内部调用 Process.close() 相关(使用不正确,但没有后果)。

进程中的 MainThread 始终是退出进程中的最后一个线程,并且它正在加入 非守护程序 线程作为其 _shutdown() 的一部分-常规。这就是 MainThread 在其 "surface" 工作已经完成时保持处于边缘状态的原因。

The problem is with a non-daemon thread that I'm starting in run() method of Task. so I can surely say that thread is preventing my process to be closed even after its MainThread is done. but I'm still confused because target function of that non-daemon thread is done successfully.

现在,在这幅图中,您 thread-1 的目标函数可以 实际上成功完成。然而,这个 thread-1 已经启动了另一个 thread-2,然后它会做一些持续很长时间的事情,比如在最坏的情况下永远阻塞。

Q: If thread-1 itself is not the problem, why there is no hanging when you make thread-1 a daemon?

这是因为 daemon-flag 的 "initial value is inherited from the creating thread"。因此,使 thread-1 成为 daemon,也会使其后代 thread-2 成为 daemon,除非明确设置 thread-2daemon 标志。守护进程在关机时没有加入,整个过程 "exits when no alive non-daemon threads are left".

请注意,在 Python 3.7 之前,由 Process 创建的非守护进程 加入。 MainProcess 之外线程的这种不同行为已在 bpo-18966.

中得到修复

代码

为了表明这种情况已经可以通过更简单的设置重现,下面的示例使用 MainProcess 作为不会退出的进程。 thread-2 这里是一个 Timer-thread,它将在 10 秒后启动并调用 threading.Barrier(parties=1).wait()。此 .wait() 调用将立即以 parties=1 结束,或以 parties=2 永远阻塞,因为在我们的设置中不存在在此 Barrier 上调用 .wait() 的其他方。这样可以轻松切换我们想要重现的行为。

import threading

def blackbox(parties):
    """Dummy for starting thread we might not know about."""
    timer = threading.Timer(10, threading.Barrier(parties=parties).wait)  # Thread-2
    timer.name = "TimerThread"
    timer.start()


def t1_target(parties):  # Thread-1
    """Start another thread and exit without joining."""
    logger = get_mp_logger()
    logger.info(f"ALIVE: {[t.name for t in threading.enumerate()]}")
    blackbox(parties)
    logger.info(f"ALIVE: {[t.name for t in threading.enumerate()]}")
    logger.info("DONE")


if __name__ == '__main__':

    import logging

    parties = 1
    daemon = False
    print(f"parties={parties}, daemon={daemon}")

    logger = get_mp_logger(logging.INFO)
    logger.info(f"ALIVE: {[t.name for t in threading.enumerate()]}")
    t = threading.Thread(target=t1_target, args=(parties,), daemon=daemon)
    t.start()
    t.join()
    logger.info(f"ALIVE: {[t.name for t in threading.enumerate()]}")    
    logger.info("DONE")

下面的日志是针对 parties=1 的,因此没有无限阻塞,但由于 thread-2 不是守护线程,MainThread 将在关闭时加入它。请注意 TimerThreadt1_target 完成后仍然存在。这里主要关注的是 MainThread"DONE""process shutting down" 需要大约 10 秒的时间。这是 TimerThread 还活着的 10 秒。

parties=1, daemon=False
[18:04:31,977 MainThread <module>] ALIVE: ['MainThread']
[18:04:31,977 Thread-1 t1_target] ALIVE: ['MainThread', 'Thread-1']
[18:04:31,978 Thread-1 t1_target] ALIVE: ['MainThread', 'Thread-1', 'TimerThread']
[18:04:31,978 Thread-1 t1_target] DONE
[18:04:31,978 MainThread <module>] ALIVE: ['MainThread', 'TimerThread']
[18:04:31,978 MainThread <module>] DONE
[18:04:41,978 MainThread info] process shutting down

Process finished with exit code 0

有了 parties=2 它永远挂在这个阶段,...

parties=2, daemon=False
[18:05:06,010 MainThread <module>] ALIVE: ['MainThread']
[18:05:06,010 Thread-1 t1_target] ALIVE: ['MainThread', 'Thread-1']
[18:05:06,011 Thread-1 t1_target] ALIVE: ['MainThread', 'Thread-1', 'TimerThread']
[18:05:06,011 Thread-1 t1_target] DONE
[18:05:06,011 MainThread <module>] ALIVE: ['MainThread', 'TimerThread']
[18:05:06,011 MainThread <module>] DONE

...除非您还设置了 daemon=True,或者为 thread-1thread-2 继承)或者直接为 thread-2

parties=2, daemon=True
[18:05:35,539 MainThread <module>] ALIVE: ['MainThread']
[18:05:35,539 Thread-1 t1_target] ALIVE: ['MainThread', 'Thread-1']
[18:05:35,539 Thread-1 t1_target] ALIVE: ['MainThread', 'Thread-1', 'TimerThread']
[18:05:35,539 Thread-1 t1_target] DONE
[18:05:35,539 MainThread <module>] ALIVE: ['MainThread', 'TimerThread']
[18:05:35,539 MainThread <module>] DONE
[18:05:35,539 MainThread info] process shutting down

Process finished with exit code 0

帮手

DEFAULT_MP_FORMAT = \
    '[%(asctime)s,%(msecs)03d %(threadName)s %(funcName)s]' \
    ' %(message)s'
DEFAULT_DATEFORMAT = "%H:%M:%S"  # "%Y-%m-%d %H:%M:%S"


def get_mp_logger(level=None, fmt=DEFAULT_MP_FORMAT, datefmt=DEFAULT_DATEFORMAT):
    """
    Initialize multiprocessing-logger if needed and return reference.
    """
    import multiprocessing.util as util
    import logging
    logger = util.get_logger()
    if not logger.handlers:
        logger = util.log_to_stderr(level)
    logger.handlers[0].setFormatter(logging.Formatter(fmt, datefmt))
    return logger