我应该使用事件、信号量、锁、条件或其组合来管理安全退出我的多线程 Python 程序吗?
Should I use Events, Semaphores, Locks, Conditions, or a combination thereof to manage safely exiting my multithreaded Python program?
我正在编写一个多线程 python 程序,其中主线程和它产生的其他线程 运行 作为守护进程(但不是 Thread.daemon=True)寻找某些某些目录中的文件,并在它们存在时对它们执行操作。有可能 one/any 个线程发生错误,需要整个程序退出。但是,我需要其他线程在退出前完成它们当前的工作。
据我了解,如果我为生成的线程设置 myThread.daemon=True,它们将在主线程退出时立即自动退出。但是,我希望其他线程在退出之前完成它们当前的工作(除非错误是某种灾难性的失败,在这种情况下,我可能无论如何都会退出所有内容,安全与否)。因此,我没有将线程的守护进程 属性 设置为 True。
查看线程模块文档和我可用的各种对象,例如事件、信号量、条件和锁,我不确定处理我的情况的最佳方法。此外,当程序由于 SIGTERM/SIGINT 信号而需要终止时,我不确定如何处理这种情况。
一些代码说明了我的程序结构的简化版本:
import threading
import signals
import glob
import time
class MyThread1( threading.thread ):
def __init__( self, name='MyThread1' ):
threading.Thread.__init__( self )
self.name = name
return
def run( self ):
while True:
filePathList = glob.glob( thisThreadDir + '/*.txt' )
for file in filePathList:
try:
doSomeProcessing( file )
# Then move the file to another thread's dir
# or potentially create a new file that will
# be picked up by another thread
except:
# Need to potentially tell all other threads
# to finish task and exit depending on error
# I assume this would be the place to check for some kind of
# flag or other indication to terminate the thread?
time.sleep( 30 )
# Now imagine a few more custom threads with the same basic structure,
# except that what is happening in doSomeProcessing() will obviously vary
# Main Thread/Script
def sigintHandler( SIGINT, frame ):
# How do I handle telling all threads to finish their current loop
# and then exit safely when I encounter this signal?
sys.exit( 1 )
def sigtermHandler( SIGTERM, frame ):
# Same question for this signal handler
sys.exit( 1 )
signal.signal( signal.SIGINT, sigintHandler )
signal.signal( signal.SIGTERM, sigtermHandler )
myOtherThread1 = MyThread1()
myOtherThreadN = MyThreadN()
myOtherThread1.start()
myOtherThreadN.start()
while True:
filePathList = glob.glob( mainDir + '/*.txt' )
for file in filePathList:
try:
doMainProcessing( file )
# Move file or write a new one in another thread's dir
except:
# Again, potentially need to exit the whole program, but want
# the other threads to finish their current loop first
# Check if another thread told us we need to exit?
time.sleep( 30 )
我会使用 Event
向线程发出它应该退出的信号:
- 在
__init__
中创建一个事件
- 在
run()
中使用事件的 wait()
sleep
并检查何时退出
- 从外部设置事件以停止线程
为了在一个线程中处理异常,我会用一个try
/except
块来包围它所做的一切。当捕获到某些东西时,存储异常(and/or 您需要的任何其他信息),清理并退出线程。
在外面,在主线程中,检查所有线程中的存储异常,如果发现任何异常,则通知所有线程退出。
要在主线程中处理异常(也包括SIGINT
),在那里有一个try
/except
块并发出信号停止所有线程。
全部加起来,加上虚拟异常和调试打印:
import threading
import time
class MyThread(threading.Thread):
def __init__(self):
super().__init__()
self.stop_requested = threading.Event()
self.exception = None
def run(self):
try:
# sleep for 1 second, or until stop is requested, stop if stop is requested
while not self.stop_requested.wait(1):
# do your thread thing here
print('in thread {}'.format(self))
# simulate a random exception:
import random
if random.randint(0, 50) == 42:
1 / 0
except Exception as e:
self.exception = e
# clean up here
print('clean up thread {}'.format(self))
def stop(self):
# set the event to signal stop
self.stop_requested.set()
# create and start some threads
threads = [MyThread(), MyThread(), MyThread(), MyThread()]
for t in threads:
t.start()
# main thread looks at the status of all threads
try:
while True:
for t in threads:
if t.exception:
# there was an error in a thread - raise it in main thread too
# this will stop the loop
raise t.exception
time.sleep(0.2)
except Exception as e:
# handle exceptions any way you like, or don't
# This includes exceptions in main thread as well as those in other threads
# (because of "raise t.exception" above)
print(e)
finally:
print('clan up everything')
for t in threads:
# threads will know how to clean up when stopped
t.stop()
我正在编写一个多线程 python 程序,其中主线程和它产生的其他线程 运行 作为守护进程(但不是 Thread.daemon=True)寻找某些某些目录中的文件,并在它们存在时对它们执行操作。有可能 one/any 个线程发生错误,需要整个程序退出。但是,我需要其他线程在退出前完成它们当前的工作。
据我了解,如果我为生成的线程设置 myThread.daemon=True,它们将在主线程退出时立即自动退出。但是,我希望其他线程在退出之前完成它们当前的工作(除非错误是某种灾难性的失败,在这种情况下,我可能无论如何都会退出所有内容,安全与否)。因此,我没有将线程的守护进程 属性 设置为 True。
查看线程模块文档和我可用的各种对象,例如事件、信号量、条件和锁,我不确定处理我的情况的最佳方法。此外,当程序由于 SIGTERM/SIGINT 信号而需要终止时,我不确定如何处理这种情况。
一些代码说明了我的程序结构的简化版本:
import threading
import signals
import glob
import time
class MyThread1( threading.thread ):
def __init__( self, name='MyThread1' ):
threading.Thread.__init__( self )
self.name = name
return
def run( self ):
while True:
filePathList = glob.glob( thisThreadDir + '/*.txt' )
for file in filePathList:
try:
doSomeProcessing( file )
# Then move the file to another thread's dir
# or potentially create a new file that will
# be picked up by another thread
except:
# Need to potentially tell all other threads
# to finish task and exit depending on error
# I assume this would be the place to check for some kind of
# flag or other indication to terminate the thread?
time.sleep( 30 )
# Now imagine a few more custom threads with the same basic structure,
# except that what is happening in doSomeProcessing() will obviously vary
# Main Thread/Script
def sigintHandler( SIGINT, frame ):
# How do I handle telling all threads to finish their current loop
# and then exit safely when I encounter this signal?
sys.exit( 1 )
def sigtermHandler( SIGTERM, frame ):
# Same question for this signal handler
sys.exit( 1 )
signal.signal( signal.SIGINT, sigintHandler )
signal.signal( signal.SIGTERM, sigtermHandler )
myOtherThread1 = MyThread1()
myOtherThreadN = MyThreadN()
myOtherThread1.start()
myOtherThreadN.start()
while True:
filePathList = glob.glob( mainDir + '/*.txt' )
for file in filePathList:
try:
doMainProcessing( file )
# Move file or write a new one in another thread's dir
except:
# Again, potentially need to exit the whole program, but want
# the other threads to finish their current loop first
# Check if another thread told us we need to exit?
time.sleep( 30 )
我会使用 Event
向线程发出它应该退出的信号:
- 在
__init__
中创建一个事件
- 在
run()
中使用事件的wait()
sleep
并检查何时退出 - 从外部设置事件以停止线程
为了在一个线程中处理异常,我会用一个try
/except
块来包围它所做的一切。当捕获到某些东西时,存储异常(and/or 您需要的任何其他信息),清理并退出线程。
在外面,在主线程中,检查所有线程中的存储异常,如果发现任何异常,则通知所有线程退出。
要在主线程中处理异常(也包括SIGINT
),在那里有一个try
/except
块并发出信号停止所有线程。
全部加起来,加上虚拟异常和调试打印:
import threading
import time
class MyThread(threading.Thread):
def __init__(self):
super().__init__()
self.stop_requested = threading.Event()
self.exception = None
def run(self):
try:
# sleep for 1 second, or until stop is requested, stop if stop is requested
while not self.stop_requested.wait(1):
# do your thread thing here
print('in thread {}'.format(self))
# simulate a random exception:
import random
if random.randint(0, 50) == 42:
1 / 0
except Exception as e:
self.exception = e
# clean up here
print('clean up thread {}'.format(self))
def stop(self):
# set the event to signal stop
self.stop_requested.set()
# create and start some threads
threads = [MyThread(), MyThread(), MyThread(), MyThread()]
for t in threads:
t.start()
# main thread looks at the status of all threads
try:
while True:
for t in threads:
if t.exception:
# there was an error in a thread - raise it in main thread too
# this will stop the loop
raise t.exception
time.sleep(0.2)
except Exception as e:
# handle exceptions any way you like, or don't
# This includes exceptions in main thread as well as those in other threads
# (because of "raise t.exception" above)
print(e)
finally:
print('clan up everything')
for t in threads:
# threads will know how to clean up when stopped
t.stop()