ThreadPoolExecutor 在所有线程实际完成之前完成
ThreadPoolExecutor finishing before all threads are actually finished
我有 28 个方法 运行 在一个池中。 ThreadPoolExecutor 总共创建了 28 个线程,ThreadPoolExecutor 是一个 Executor 子类,它使用线程池来异步执行调用。在线程执行期间,我正在使用 Plotly 生成一些图表。我遇到了 ThreadPoolExecutor 在所有线程实际完成之前完成的问题。我总是遇到 4 个未创建(未完成)的图表(线程)。这是我的代码:
from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor(max_workers=len(methods))
for method in methods:
pool.submit(method, commits, system_name, reset, chart_output)
pool.shutdown(wait=True)
执行的方法如下所示:
def commits_by_date(commits, system_name, reset, chart_output):
collection_name = "commits_by_date"
reset_db_data(reset, system_name, collection_name)
date_commits = retrieve_db_data(system_name, collection_name)
if len(date_commits) == 0:
date_commits = commits.groupby('commit_date')[['sha']].count()
date_commits = date_commits.rename(columns={'sha': 'commit_count'})
date_commits.insert(0, "system_name", system_name)
date_commits = date_commits.reset_index()
save_df_to_db(date_commits, collection_name)
if chart_output:
fig = go.Figure([go.Scatter(
x=date_commits.commit_date,
y=date_commits.commit_count,
text=date_commits.commit_count,
fill='tozeroy')])
fig.update_layout(
title='Commits by Date',
yaxis_title='Commits Count')
fig.write_html('commits_by_date.html', auto_open=True)
答案是使用:
import time
for method in methods:
pool.submit(method, commits, system_name, reset, chart_output)
time.sleep(as_many_you_want)
这取决于 method
在做什么。使用并发时,必须避免共享可变状态。您尝试同时执行的函数似乎访问了 plotly graph,这是一个共享的可变状态。
为了避免问题,您应该只编写 reentrant 的并发代码,并且应该同步执行改变共享状态的部分代码。
实现此目的的一种方法是将 method
分解为两个函数:第一个函数执行您想要并行化的繁重工作(并且必须是可重入的),第二个函数同步绘制结果。
这是一个如何使用 Python concurrent.futures
模块实现此目的的示例:
from concurrent.futures import ThreadPoolExecutor, as_completed
def heavy_work(arg):
# Some heavy work...
result = slow_function(arg)
return result
def plot(result, figure):
# Plot the result to a shared figure,
# must be executed synchronously.
figure.plot(result)
args = [...] # List of arguments to `heavy_work`
figure = ... # The shared figure
# Submit work to be executed concurrently
with ThreadPoolExecutor() as pool:
futures = [pool.submit(heavy_work, arg) for arg in args]
# Serialize the calls to `plot`
for future in as_completed(futures):
result = future.result()
plot(result, figure)
我有 28 个方法 运行 在一个池中。 ThreadPoolExecutor 总共创建了 28 个线程,ThreadPoolExecutor 是一个 Executor 子类,它使用线程池来异步执行调用。在线程执行期间,我正在使用 Plotly 生成一些图表。我遇到了 ThreadPoolExecutor 在所有线程实际完成之前完成的问题。我总是遇到 4 个未创建(未完成)的图表(线程)。这是我的代码:
from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor(max_workers=len(methods))
for method in methods:
pool.submit(method, commits, system_name, reset, chart_output)
pool.shutdown(wait=True)
执行的方法如下所示:
def commits_by_date(commits, system_name, reset, chart_output):
collection_name = "commits_by_date"
reset_db_data(reset, system_name, collection_name)
date_commits = retrieve_db_data(system_name, collection_name)
if len(date_commits) == 0:
date_commits = commits.groupby('commit_date')[['sha']].count()
date_commits = date_commits.rename(columns={'sha': 'commit_count'})
date_commits.insert(0, "system_name", system_name)
date_commits = date_commits.reset_index()
save_df_to_db(date_commits, collection_name)
if chart_output:
fig = go.Figure([go.Scatter(
x=date_commits.commit_date,
y=date_commits.commit_count,
text=date_commits.commit_count,
fill='tozeroy')])
fig.update_layout(
title='Commits by Date',
yaxis_title='Commits Count')
fig.write_html('commits_by_date.html', auto_open=True)
答案是使用:
import time
for method in methods:
pool.submit(method, commits, system_name, reset, chart_output)
time.sleep(as_many_you_want)
这取决于 method
在做什么。使用并发时,必须避免共享可变状态。您尝试同时执行的函数似乎访问了 plotly graph,这是一个共享的可变状态。
为了避免问题,您应该只编写 reentrant 的并发代码,并且应该同步执行改变共享状态的部分代码。
实现此目的的一种方法是将 method
分解为两个函数:第一个函数执行您想要并行化的繁重工作(并且必须是可重入的),第二个函数同步绘制结果。
这是一个如何使用 Python concurrent.futures
模块实现此目的的示例:
from concurrent.futures import ThreadPoolExecutor, as_completed
def heavy_work(arg):
# Some heavy work...
result = slow_function(arg)
return result
def plot(result, figure):
# Plot the result to a shared figure,
# must be executed synchronously.
figure.plot(result)
args = [...] # List of arguments to `heavy_work`
figure = ... # The shared figure
# Submit work to be executed concurrently
with ThreadPoolExecutor() as pool:
futures = [pool.submit(heavy_work, arg) for arg in args]
# Serialize the calls to `plot`
for future in as_completed(futures):
result = future.result()
plot(result, figure)