Python 中 ThreadPoolExecutor 上下文中的轻持久性
Light persistence in the context of ThreadPoolExecutor in Python
我有一些 Python 代码可以使用 ThreadPoolExecutor 分出昂贵的作业,我想跟踪其中哪些已经完成,这样如果我必须重新启动这个系统,我就不会必须重做已经完成的东西。在单线程上下文中,我可以只标记我在架子上所做的事情。这是将这个想法移植到多线程环境中的一个天真的端口:
from concurrent.futures import ThreadPoolExecutor
import subprocess
import shelve
def do_thing(done, x):
# Don't let the command run in the background; we want to be able to tell when it's done
_ = subprocess.check_output(["some_expensive_command", x])
done[x] = True
futs = []
with shelve.open("done") as done:
with ThreadPoolExecutor(max_workers=18) as executor:
for x in things_to_do:
if done.get(x, False):
continue
futs.append(executor.submit(do_thing, done, x))
# Can't run `done[x] = True` here--have to wait until do_thing finishes
for future in futs:
future.result()
# Don't want to wait until here to mark stuff done, as the whole system might be killed at some point
# before we get through all of things_to_do
我能逃脱惩罚吗? documentation for shelve 不包含任何关于线程安全的保证,所以我认为不。
那么处理这个问题的简单方法是什么?我认为也许将 done[x] = True
粘贴到 future.add_done_callback
中就可以了,但是 that will often run in the same thread as the future itself。也许有一种锁定机制可以很好地与 ThreadPoolExecutor 配合使用?对我来说,编写一个休眠循环然后检查已完成的 futures 似乎更干净。
虽然您仍在最外层的 with
上下文管理器中,但 done
搁置只是一个普通的 python 对象 - 它仅在上下文时写入磁盘管理器关闭,它 运行 是它的 __exit__
方法。因此,由于 GIL(只要您使用的是 CPython),它与任何其他 python 对象一样线程安全。
具体来说,重新分配 done[x] = True
是线程安全的/将以原子方式完成。
重要的是要注意,虽然搁置的 __exit__
方法将在 Ctrl-C 后 运行,但如果 python 过程突然结束,则不会,搁置获胜'保存到磁盘。
为了防止这种故障,我建议使用轻量级的基于文件的线程安全数据库,例如 sqllite3。
我有一些 Python 代码可以使用 ThreadPoolExecutor 分出昂贵的作业,我想跟踪其中哪些已经完成,这样如果我必须重新启动这个系统,我就不会必须重做已经完成的东西。在单线程上下文中,我可以只标记我在架子上所做的事情。这是将这个想法移植到多线程环境中的一个天真的端口:
from concurrent.futures import ThreadPoolExecutor
import subprocess
import shelve
def do_thing(done, x):
# Don't let the command run in the background; we want to be able to tell when it's done
_ = subprocess.check_output(["some_expensive_command", x])
done[x] = True
futs = []
with shelve.open("done") as done:
with ThreadPoolExecutor(max_workers=18) as executor:
for x in things_to_do:
if done.get(x, False):
continue
futs.append(executor.submit(do_thing, done, x))
# Can't run `done[x] = True` here--have to wait until do_thing finishes
for future in futs:
future.result()
# Don't want to wait until here to mark stuff done, as the whole system might be killed at some point
# before we get through all of things_to_do
我能逃脱惩罚吗? documentation for shelve 不包含任何关于线程安全的保证,所以我认为不。
那么处理这个问题的简单方法是什么?我认为也许将 done[x] = True
粘贴到 future.add_done_callback
中就可以了,但是 that will often run in the same thread as the future itself。也许有一种锁定机制可以很好地与 ThreadPoolExecutor 配合使用?对我来说,编写一个休眠循环然后检查已完成的 futures 似乎更干净。
虽然您仍在最外层的 with
上下文管理器中,但 done
搁置只是一个普通的 python 对象 - 它仅在上下文时写入磁盘管理器关闭,它 运行 是它的 __exit__
方法。因此,由于 GIL(只要您使用的是 CPython),它与任何其他 python 对象一样线程安全。
具体来说,重新分配 done[x] = True
是线程安全的/将以原子方式完成。
重要的是要注意,虽然搁置的 __exit__
方法将在 Ctrl-C 后 运行,但如果 python 过程突然结束,则不会,搁置获胜'保存到磁盘。
为了防止这种故障,我建议使用轻量级的基于文件的线程安全数据库,例如 sqllite3。