ThreadPoolExecutor 在所有线程实际完成之前完成

ThreadPoolExecutor finishing before all threads are actually finished

我有 28 个方法 运行 在一个池中。 ThreadPoolExecutor 总共创建了 28 个线程,ThreadPoolExecutor 是一个 Executor 子类,它使用线程池来异步执行调用。在线程执行期间,我正在使用 Plotly 生成一些图表。我遇到了 ThreadPoolExecutor 在所有线程实际完成之前完成的问题。我总是遇到 4 个未创建(未完成)的图表(线程)。这是我的代码:

from concurrent.futures import ThreadPoolExecutor

pool = ThreadPoolExecutor(max_workers=len(methods))

for method in methods:
    pool.submit(method, commits, system_name, reset, chart_output)

pool.shutdown(wait=True)

执行的方法如下所示:

def commits_by_date(commits, system_name, reset, chart_output):
collection_name = "commits_by_date"
reset_db_data(reset, system_name, collection_name)
date_commits = retrieve_db_data(system_name, collection_name)

if len(date_commits) == 0:
    date_commits = commits.groupby('commit_date')[['sha']].count()
    date_commits = date_commits.rename(columns={'sha': 'commit_count'})
    date_commits.insert(0, "system_name", system_name)
    date_commits = date_commits.reset_index()
    save_df_to_db(date_commits, collection_name)

if chart_output:
    fig = go.Figure([go.Scatter(
        x=date_commits.commit_date,
        y=date_commits.commit_count,
        text=date_commits.commit_count,
        fill='tozeroy')])
    fig.update_layout(
        title='Commits by Date',
        yaxis_title='Commits Count')
    fig.write_html('commits_by_date.html', auto_open=True)

答案是使用:

import time

for method in methods:
    pool.submit(method, commits, system_name, reset, chart_output)
    time.sleep(as_many_you_want)

这取决于 method 在做什么。使用并发时,必须避免共享可变状态。您尝试同时执行的函数似乎访问了 plotly graph,这是一个共享的可变状态。

为了避免问题,您应该只编写 reentrant 的并发代码,并且应该同步执行改变共享状态的部分代码。

实现此目的的一种方法是将 method 分解为两个函数:第一个函数执行您想要并行化的繁重工作(并且必须是可重入的),第二个函数同步绘制结果。

这是一个如何使用 Python concurrent.futures 模块实现此目的的示例:

from concurrent.futures import ThreadPoolExecutor, as_completed

def heavy_work(arg):
  # Some heavy work...
  result = slow_function(arg)
  return result

def plot(result, figure):
  # Plot the result to a shared figure,
  # must be executed synchronously.
  figure.plot(result)

args = [...]  # List of arguments to `heavy_work`
figure = ...  # The shared figure

# Submit work to be executed concurrently
with ThreadPoolExecutor() as pool:
  futures = [pool.submit(heavy_work, arg) for arg in args]

# Serialize the calls to `plot`
for future in as_completed(futures):
  result = future.result()
  plot(result, figure)