Python 中 ThreadPoolExecutor 上下文中的轻持久性

Light persistence in the context of ThreadPoolExecutor in Python

我有一些 Python 代码可以使用 ThreadPoolExecutor 分出昂贵的作业,我想跟踪其中哪些已经完成,这样如果我必须重新启动这个系统,我就不会必须重做已经完成的东西。在单线程上下文中,我可以只标记我在架子上所做的事情。这是将这个想法移植到多线程环境中的一个天真的端口:

from concurrent.futures import ThreadPoolExecutor
import subprocess
import shelve


def do_thing(done, x):
    # Don't let the command run in the background; we want to be able to tell when it's done
    _ = subprocess.check_output(["some_expensive_command", x])
    done[x] = True


futs = []
with shelve.open("done") as done:
    with ThreadPoolExecutor(max_workers=18) as executor:
        for x in things_to_do:
            if done.get(x, False):
                continue
            futs.append(executor.submit(do_thing, done, x))
            # Can't run `done[x] = True` here--have to wait until do_thing finishes
        for future in futs:
            future.result()

    # Don't want to wait until here to mark stuff done, as the whole system might be killed at some point
    # before we get through all of things_to_do

我能逃脱惩罚吗? documentation for shelve 不包含任何关于线程安全的保证,所以我认为不。

那么处理这个问题的简单方法是什么?我认为也许将 done[x] = True 粘贴到 future.add_done_callback 中就可以了,但是 that will often run in the same thread as the future itself。也许有一种锁定机制可以很好地与 ThreadPoolExecutor 配合使用?对我来说,编写一个休眠循环然后检查已完成的 futures 似乎更干净。

虽然您仍在最外层的 with 上下文管理器中,但 done 搁置只是一个普通的 python 对象 - 它仅在上下文时写入磁盘管理器关闭,它 运行 是它的 __exit__ 方法。因此,由于 GIL(只要您使用的是 CPython),它与任何其他 python 对象一样线程安全。

具体来说,重新分配 done[x] = True 是线程安全的/将以原子方式完成。

重要的是要注意,虽然搁置的 __exit__ 方法将在 Ctrl-C 后 运行,但如果 python 过程突然结束,则不会,搁置获胜'保存到磁盘。

为了防止这种故障,我建议使用轻量级的基于文件的线程安全数据库,例如 sqllite3