为什么 concurrent.futures.ProcessPoolExecutor() 跳过迭代?

Why is concurrent.futures.ProcessPoolExecutor() skipping iterations?

我有一个使用 concurrent.futures.ProcessPoolExecutor() 并行化的函数,如下所示:

with concurrent.futures.ProcessPoolExecutor() as executor:
    executor.map(my_func, ids, it.repeat(num_ids))

其中 ids 是由两个元素组成的元组列表。第一个元素包含一个整数,对于每个后续元组,该整数递增 1。我用它来创建一个 'iteration progress tracker'。第二个元素包含一个输入 my_func uses.

my_func 太长了,无法在此处添加,而且我无法获得具有互惠行为的 MRE。然而,它看起来像这样:

def my_func(id, num_ids):
    print(f"{id[0]} of {num_ids}")
    # Extract something from a database, transform it and then add the new data back into the database

在一个 运行 上,我注意到在迭代跟踪器上大约 5k 时它突然跳到 10k,停顿了一下然后继续前进。之后,它每隔一段时间就会跳过一些记录。如果我再次 运行 代码,此模式会重复,但每次跳过的位置都略有不同。

我在 VS Code 中正式进入调试模式,但令我惊讶的是,当 运行 从调试器中调用代码时,没有记录被跳过。没有错误,什么都没有。

我发现在调试器之外停止跳过的唯一方法是将 max_workers 参数设置为我的线程的一半。

我知道如果没有 MRE,这很难诊断,但我希望其他人可能遇到过这个问题或认识到这些症状?

鉴于所提供的信息,很难回答具体案例。
我最好的猜测是 my_func.
的正文中发生异常 实际上,如果发生异常,则不会打印任何内容,执行也不会终止。
为了验证我的假设,我将定义一个装饰器来打印函数中发生的异常:

import functools

def log_function(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as exc:
            print(args, kwargs, repr(exc))

    return wrapper

并将装饰器应用于函数。

@log_function
def my_func(my_id, num_ids):
   ...