强制一个线程阻止所有其他线程执行
Forcing a thread to block all other threads from executing
更新:
This answer states that what I'm trying to do is impossible as of April 2013. This, however, seems to contradict what Alex Martelli says in Python Cookbook(第 624 页,第 3 版):
Upon return, PyGILState_Ensure() always guarantees that the calling
thread has exclusive access to the Python interpreter. This is true
even if the calling C code is running a different thread that is
unknown to the interpreter.
The docs 似乎也暗示可以获取 GIL,这会给我希望(除了我不认为我可以从纯 python 代码调用 PyGILState_Ensure()
,如果我创建了一个 C 扩展来调用它,我不确定如何将我的 memory_daemon()
嵌入其中。
也许我误读了答案或 Python 食谱和文档。
原始问题:
我想要一个给定的线程(来自 threading
模块)在其代码的某个片段正在执行时阻止任何其他线程 运行。实现它的最简单方法是什么?
显然,最好尽量减少其他线程中的代码更改,避免使用 C 和直接 OS 调用,并使其跨平台用于 windows 和 linux.但实际上,我很乐意为我的实际环境提供任何解决方案(见下文)。
环境:
- CPython
- python 3.4(但如果有帮助可以升级到 3.5)
- Ubuntu 14.04
用例:
出于调试目的,我计算了所有对象使用的内存(由 gc.get_objects()
报告),并将一些摘要报告打印到 sys.stderr
。我在一个单独的线程中执行此操作,因为我希望此摘要从其他线程异步传送;我将 time.sleep(10)
放在执行实际内存使用计算的 while True
循环的末尾。然而,内存报告线程需要一段时间才能完成每个报告,我不希望所有其他线程在内存计算完成之前继续前进(否则,内存快照将很难解释)。
示例(澄清问题):
import threading as th
import time
def report_memory_consumption():
# go through `gc.get_objects()`, check their size and print a summary
# takes ~5 min to run
def memory_daemon():
while True:
# all other threads should not do anything until this call is complete
report_memory_consumption()
# sleep for 10 sec, then update memory summary
# this sleep is the only time when other threads should be executed
time.sleep(10)
def f1():
# do something, including calling many other functions
# takes ~3 min to run
def f2():
# do something, including calling many other functions
# takes ~3 min to run
def main():
t_mem = th.Thread(target = memory_daemon)
t1 = th.Thread(target = f1)
t2 = th.Thread(target = f2)
t_mem.start()
t1.start()
t2.start()
# requirement: no other thread is running while t_mem is not sleeping
由于全局解释器锁,Python 总是 一次执行一个线程。当涉及 multiprocessing
时,它不会这样做。您可以查看 以了解有关 CPython.
中 GIL 的更多信息
请注意,这是伪代码,因为我不知道您是如何创建 threads/using them/which 您在线程中执行的代码。
import threading, time
l=threading.Lock()
locked=False
def worker():
l.acquire()
locked=True
#do something
l.release()
def test():
while locked:
time.sleep(10)
#do something
threads = []
t = threading.Thread(target=worker)
threads.append(t)
t = threading.Thread(target=test)
threads.append(t)
for th in threads:
th.start()
for th in threads:
th.join()
当然可以写的更好,可以优化
您应该使用线程锁在线程之间同步执行代码。给出的答案有些正确,但我会使用可重入本地人再次检查以查看您是否确实拥有锁。
不要使用另一个答案中描述的变量来检查是否拥有锁。变量可能会在多个线程之间损坏。可重入锁就是为了解决这个问题。
此外,该代码中的不正确之处在于,假定代码之间的代码不抛出异常就释放了锁。所以总是在 with
上下文或 try-catch-finally
.
中做
这是一个很棒的article explaining synchronization in Python and threading docs。
编辑:回答 OP 关于在 C
中嵌入 Python 的更新
你误解了他在食谱上说的话。 PyGILState_Ensure
returns GIL,如果 GIL 在 当前 python 解释器 中可用,而不是 python 未知的 C 线程口译员。
您不能强制从当前解释器中的其他线程获取 GIL。想象一下,如果你能做到,那么基本上你会蚕食所有其他线程。
作为权宜之计(出于显而易见的原因),以下方法对我有用:
def report_memory_consumption():
sys.setswitchinterval(1000) # longer than the expected run time
# go through `gc.get_objects()`, check their size and print a summary
# takes ~5 min to run
sys.setswitchinterval(0.005) # the default value
如果谁有更好的答案,请post吧。
Python 食谱是正确的。在 PyGILState_Ensure()
returns 时,您可以独占访问 Python 解释器。独占访问意味着您可以安全地调用所有 CPython 函数。这意味着当前的 C 线程也是当前活动的 Python 线程。如果当前C线程之前没有对应的Python线程,PyGILState_Ensure()
会自动为你创建一个。
那是PyGILState_Ensure()
之后的状态。那时您还获得了 GIL。
然而,当您现在调用其他 CPython 函数时,例如 PyEval_EvalCode()
或任何其他函数,它们可以隐式地使 GIL 同时被释放。例如,如果 Python 语句 time.sleep(0.1)
结果在某处被隐式调用,就是这种情况。当 GIL 从该线程中释放时,其他 Python 线程可以 运行.
您只能保证当 PyEval_EvalCode()
(或您调用的任何其他 CPython 函数)returns 时,您将再次拥有与以前相同的状态 - 即您是在同一个活动 Python 线程上,您又拥有了 GIL。
关于你原来的问题:目前没有办法实现这一点,即调用 Python 代码并避免 GIL 同时在某处被释放。这是一件好事,否则你很容易陷入僵局,例如如果您不允许其他线程释放它当前持有的一些锁。
关于如何实现您的用例:唯一真正的方法是在 C 中。您可以调用 PyGILState_Ensure()
来获取 GIL。那时,您必须只调用那些不会产生调用其他 Python 代码的副作用的 CPython 函数。要特别小心。即使 PyObj_DecRef()
也可以调用 __del__
。最好的办法是避免调用任何 CPython 函数并手动遍历 CPython 对象。请注意,您可能不必像概述的那样复杂:有底层的 CPython 内存分配器,我认为您可以从那里获取信息。
阅读 here 关于 CPython 中的内存管理。
相关代码在pymem.h, obmalloc.c and pyarena.c。请参阅函数 _PyObject_DebugMallocStats()
,尽管它可能不会编译到您的 CPython.
还有 tracemalloc module which however will add some overhead. Maybe its underlying C code (file _tracemalloc.c) 有助于更好地理解内部结构。
关于sys.setswitchinterval(1000)
:这仅与遍历Python字节码并处理它有关。这基本上就是文件ceval.c中PyEval_EvalFrameEx
中CPython的主循环。在那里你会找到这样的部分:
if (_Py_atomic_load_relaxed(&gil_drop_request))
...
文件ceval_gil.h.
中涵盖了所有具有切换间隔的逻辑
设置高的切换间隔只是意味着PyEval_EvalFrameEx
中的主循环不会被中断更长的时间。这并不意味着没有其他可能性可以同时释放 GIL 并且另一个线程可以 运行.
PyEval_EvalFrameEx
将执行 Python 字节码。让我们假设这会调用 time.sleep(1)
。这将调用该函数的本机 C 实现。您会在文件 timemodule.c 的 time_sleep()
中找到它。如果您遵循该代码,您会发现:
Py_BEGIN_ALLOW_THREADS
err = select(0, (fd_set *)0, (fd_set *)0, (fd_set *)0, &timeout);
Py_END_ALLOW_THREADS
因此,GIL 也同时发布了。现在,任何其他正在等待 GIL 的线程都可以获取它和 运行 其他 Python 代码。
理论上,您可能会想,如果您设置一个高切换间隔并且从不调用任何 Python 代码,这反过来可能会在某个时候释放 GIL,您将是安全的。请注意,这几乎是不可能的。例如。 GC 会不时被调用,某些对象的任何 __del__
都可能产生各种副作用。
更新:
This answer states that what I'm trying to do is impossible as of April 2013. This, however, seems to contradict what Alex Martelli says in Python Cookbook(第 624 页,第 3 版):
Upon return, PyGILState_Ensure() always guarantees that the calling thread has exclusive access to the Python interpreter. This is true even if the calling C code is running a different thread that is unknown to the interpreter.
The docs 似乎也暗示可以获取 GIL,这会给我希望(除了我不认为我可以从纯 python 代码调用 PyGILState_Ensure()
,如果我创建了一个 C 扩展来调用它,我不确定如何将我的 memory_daemon()
嵌入其中。
也许我误读了答案或 Python 食谱和文档。
原始问题:
我想要一个给定的线程(来自 threading
模块)在其代码的某个片段正在执行时阻止任何其他线程 运行。实现它的最简单方法是什么?
显然,最好尽量减少其他线程中的代码更改,避免使用 C 和直接 OS 调用,并使其跨平台用于 windows 和 linux.但实际上,我很乐意为我的实际环境提供任何解决方案(见下文)。
环境:
- CPython
- python 3.4(但如果有帮助可以升级到 3.5)
- Ubuntu 14.04
用例:
出于调试目的,我计算了所有对象使用的内存(由 gc.get_objects()
报告),并将一些摘要报告打印到 sys.stderr
。我在一个单独的线程中执行此操作,因为我希望此摘要从其他线程异步传送;我将 time.sleep(10)
放在执行实际内存使用计算的 while True
循环的末尾。然而,内存报告线程需要一段时间才能完成每个报告,我不希望所有其他线程在内存计算完成之前继续前进(否则,内存快照将很难解释)。
示例(澄清问题):
import threading as th
import time
def report_memory_consumption():
# go through `gc.get_objects()`, check their size and print a summary
# takes ~5 min to run
def memory_daemon():
while True:
# all other threads should not do anything until this call is complete
report_memory_consumption()
# sleep for 10 sec, then update memory summary
# this sleep is the only time when other threads should be executed
time.sleep(10)
def f1():
# do something, including calling many other functions
# takes ~3 min to run
def f2():
# do something, including calling many other functions
# takes ~3 min to run
def main():
t_mem = th.Thread(target = memory_daemon)
t1 = th.Thread(target = f1)
t2 = th.Thread(target = f2)
t_mem.start()
t1.start()
t2.start()
# requirement: no other thread is running while t_mem is not sleeping
Python 总是 一次执行一个线程。当涉及 multiprocessing
时,它不会这样做。您可以查看
请注意,这是伪代码,因为我不知道您是如何创建 threads/using them/which 您在线程中执行的代码。
import threading, time
l=threading.Lock()
locked=False
def worker():
l.acquire()
locked=True
#do something
l.release()
def test():
while locked:
time.sleep(10)
#do something
threads = []
t = threading.Thread(target=worker)
threads.append(t)
t = threading.Thread(target=test)
threads.append(t)
for th in threads:
th.start()
for th in threads:
th.join()
当然可以写的更好,可以优化
您应该使用线程锁在线程之间同步执行代码。给出的答案有些正确,但我会使用可重入本地人再次检查以查看您是否确实拥有锁。
不要使用另一个答案中描述的变量来检查是否拥有锁。变量可能会在多个线程之间损坏。可重入锁就是为了解决这个问题。
此外,该代码中的不正确之处在于,假定代码之间的代码不抛出异常就释放了锁。所以总是在 with
上下文或 try-catch-finally
.
这是一个很棒的article explaining synchronization in Python and threading docs。
编辑:回答 OP 关于在 C
中嵌入 Python 的更新你误解了他在食谱上说的话。 PyGILState_Ensure
returns GIL,如果 GIL 在 当前 python 解释器 中可用,而不是 python 未知的 C 线程口译员。
您不能强制从当前解释器中的其他线程获取 GIL。想象一下,如果你能做到,那么基本上你会蚕食所有其他线程。
作为权宜之计(出于显而易见的原因),以下方法对我有用:
def report_memory_consumption():
sys.setswitchinterval(1000) # longer than the expected run time
# go through `gc.get_objects()`, check their size and print a summary
# takes ~5 min to run
sys.setswitchinterval(0.005) # the default value
如果谁有更好的答案,请post吧。
Python 食谱是正确的。在 PyGILState_Ensure()
returns 时,您可以独占访问 Python 解释器。独占访问意味着您可以安全地调用所有 CPython 函数。这意味着当前的 C 线程也是当前活动的 Python 线程。如果当前C线程之前没有对应的Python线程,PyGILState_Ensure()
会自动为你创建一个。
那是PyGILState_Ensure()
之后的状态。那时您还获得了 GIL。
然而,当您现在调用其他 CPython 函数时,例如 PyEval_EvalCode()
或任何其他函数,它们可以隐式地使 GIL 同时被释放。例如,如果 Python 语句 time.sleep(0.1)
结果在某处被隐式调用,就是这种情况。当 GIL 从该线程中释放时,其他 Python 线程可以 运行.
您只能保证当 PyEval_EvalCode()
(或您调用的任何其他 CPython 函数)returns 时,您将再次拥有与以前相同的状态 - 即您是在同一个活动 Python 线程上,您又拥有了 GIL。
关于你原来的问题:目前没有办法实现这一点,即调用 Python 代码并避免 GIL 同时在某处被释放。这是一件好事,否则你很容易陷入僵局,例如如果您不允许其他线程释放它当前持有的一些锁。
关于如何实现您的用例:唯一真正的方法是在 C 中。您可以调用 PyGILState_Ensure()
来获取 GIL。那时,您必须只调用那些不会产生调用其他 Python 代码的副作用的 CPython 函数。要特别小心。即使 PyObj_DecRef()
也可以调用 __del__
。最好的办法是避免调用任何 CPython 函数并手动遍历 CPython 对象。请注意,您可能不必像概述的那样复杂:有底层的 CPython 内存分配器,我认为您可以从那里获取信息。
阅读 here 关于 CPython 中的内存管理。
相关代码在pymem.h, obmalloc.c and pyarena.c。请参阅函数 _PyObject_DebugMallocStats()
,尽管它可能不会编译到您的 CPython.
还有 tracemalloc module which however will add some overhead. Maybe its underlying C code (file _tracemalloc.c) 有助于更好地理解内部结构。
关于sys.setswitchinterval(1000)
:这仅与遍历Python字节码并处理它有关。这基本上就是文件ceval.c中PyEval_EvalFrameEx
中CPython的主循环。在那里你会找到这样的部分:
if (_Py_atomic_load_relaxed(&gil_drop_request))
...
文件ceval_gil.h.
中涵盖了所有具有切换间隔的逻辑设置高的切换间隔只是意味着PyEval_EvalFrameEx
中的主循环不会被中断更长的时间。这并不意味着没有其他可能性可以同时释放 GIL 并且另一个线程可以 运行.
PyEval_EvalFrameEx
将执行 Python 字节码。让我们假设这会调用 time.sleep(1)
。这将调用该函数的本机 C 实现。您会在文件 timemodule.c 的 time_sleep()
中找到它。如果您遵循该代码,您会发现:
Py_BEGIN_ALLOW_THREADS
err = select(0, (fd_set *)0, (fd_set *)0, (fd_set *)0, &timeout);
Py_END_ALLOW_THREADS
因此,GIL 也同时发布了。现在,任何其他正在等待 GIL 的线程都可以获取它和 运行 其他 Python 代码。
理论上,您可能会想,如果您设置一个高切换间隔并且从不调用任何 Python 代码,这反过来可能会在某个时候释放 GIL,您将是安全的。请注意,这几乎是不可能的。例如。 GC 会不时被调用,某些对象的任何 __del__
都可能产生各种副作用。