挂起的细胞:运行 多个 jupyter notebooks 与 papermill 并行
Hung cells: running multiple jupyter notebooks in parallel with papermill
我正在尝试 运行 通过从另一个笔记本启动 jupyter 笔记本来并行运行它们。我正在使用 papermill 来保存笔记本的输出。
在我的 scheduler.ipynb 中,我使用的是 multiprocessing
,这是 some people 取得的成功。我从一个基本笔记本创建流程,这似乎总是第一次工作 运行。我可以在 13 秒内用 sleep 10
运行 3 个笔记本。如果我有一个后续单元尝试 运行 完全相同的东西,它产生的进程(多个笔记本)将无限期挂起。我尝试添加代码以确保生成的进程具有退出代码并已完成,甚至在它们完成后调用终止 - 不走运,我的第二次尝试从未完成。
如果我这样做:
sean@server:~$ ps aux | grep ipython
root 2129 0.1 0.2 1117652 176904 ? Ssl 19:39 0:05 /opt/conda/anaconda/bin/python /opt/conda/anaconda/bin/ipython kernel -f /root/.local/share/jupyter/runtime/kernel-eee374ff-0760-4490-8ed0-db03fed84f0c.json
root 3418 0.1 0.2 1042076 173652 ? Ssl 19:42 0:03 /opt/conda/anaconda/bin/python /opt/conda/anaconda/bin/ipython kernel -f /root/.local/share/jupyter/runtime/kernel-3e2f09e8-969f-41c9-81cc-acd2ec4e3d54.json
root 4332 0.1 0.2 1042796 174896 ? Ssl 19:44 0:04 /opt/conda/anaconda/bin/python /opt/conda/anaconda/bin/ipython kernel -f /root/.local/share/jupyter/runtime/kernel-bbd4575c-109a-4fb3-b6ed-372beb27effd.json
root 17183 0.2 0.2 995344 145872 ? Ssl 20:26 0:02 /opt/conda/anaconda/bin/python /opt/conda/anaconda/bin/ipython kernel -f /root/.local/share/jupyter/runtime/kernel-27c48eb1-16b4-4442-9574-058283e48536.json
我看到似乎有 4 个 运行ning 内核(4 个进程)。当我查看 运行ning 笔记本时,我看到有 6 个 运行ning 笔记本。这似乎在 a few kernels can service multiple notebooks 的文档中得到了支持。 “一个内核进程可以同时连接到多个前端”
但是,我怀疑是因为 ipython 内核继续 运行,在未回收生成的进程的地方发生了一些不好的事情? Some say it’s not possible using multiprocessing. Others have described the same problem.
import re
import os
import multiprocessing
from os.path import isfile
from datetime import datetime
import papermill as pm
import nbformat
# avoid "RuntimeError: This event loop is already running"
# it seems papermill used to support this but it is now undocumented:
# papermill.execute_notebook(nest_asyncio=True)
import nest_asyncio
nest_asyncio.apply()
import company.config
# # Supporting Functions
# In[ ]:
def get_papermill_parameters(notebook,
notebook_prefix='/mnt/jupyter',
notebook_suffix='.ipynb'):
if isinstance(notebook, list):
notebook_path = notebook[0]
parameters = notebook[1]
tag = '_' + notebook[2] if notebook[2] is not None else None
else:
notebook_path = notebook
parameters = None
tag = ''
basename = os.path.basename(notebook_path)
dirpath = re.sub(basename + '$', '', notebook_path)
this_notebook_suffix = notebook_suffix if not re.search(notebook_suffix + '$', basename) else ''
input_notebook = notebook_prefix + notebook_path + this_notebook_suffix
scheduler_notebook_dir = notebook_prefix + dirpath + 'scheduler/'
if not os.path.exists(scheduler_notebook_dir):
os.makedirs(scheduler_notebook_dir)
output_notebook = scheduler_notebook_dir + basename
return input_notebook, output_notebook, this_notebook_suffix, parameters, tag
# In[ ]:
def add_additional_imports(input_notebook, output_notebook, current_datetime):
notebook_name = os.path.basename(output_notebook)
notebook_dir = re.sub(notebook_name, '', output_notebook)
temp_dir = notebook_dir + current_datetime + '/temp/'
results_dir = notebook_dir + current_datetime + '/'
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
if not os.path.exists(results_dir):
os.makedirs(results_dir)
updated_notebook = temp_dir + notebook_name
first_cell = nbformat.v4.new_code_cell("""
import import_ipynb
import sys
sys.path.append('/mnt/jupyter/lib')""")
metadata = {"kernelspec": {"display_name": "PySpark", "language": "python", "name": "pyspark"}}
existing_nb = nbformat.read(input_notebook, nbformat.current_nbformat)
cells = existing_nb.cells
cells.insert(0, first_cell)
new_nb = nbformat.v4.new_notebook(cells = cells, metadata = metadata)
nbformat.write(new_nb, updated_notebook, nbformat.current_nbformat)
output_notebook = results_dir + notebook_name
return updated_notebook, output_notebook
# In[ ]:
# define this function so it is easily passed to multiprocessing
def run_papermill(input_notebook, output_notebook, parameters):
pm.execute_notebook(input_notebook, output_notebook, parameters, log_output=True)
# # Run All of the Notebooks
# In[ ]:
def run(notebooks, run_hour_utc=10, scheduler=True, additional_imports=False,
parallel=False, notebook_prefix='/mnt/jupyter'):
"""
Run provided list of notebooks on a schedule or on demand.
Args:
notebooks (list): a list of notebooks to run
run_hour_utc (int): hour to run notebooks at
scheduler (boolean): when set to True (default value) notebooks will run at run_hour_utc.
when set to False notebooks will run on demand.
additional_imports (boolean): set to True if you need to add additional imports into your notebook
parallel (boolean): whether to run the notebooks in parallel
notebook_prefix (str): path to jupyter notebooks
"""
if not scheduler or datetime.now().hour == run_hour_utc: # Only run once a day on an hourly cron job.
now = datetime.today().strftime('%Y-%m-%d_%H%M%S')
procs = []
notebooks_base_url = company.config.cluster['resources']['daedalus']['notebook'] + '/notebooks'
if parallel and len(notebooks) > 10:
raise Exception("You are trying to run {len(notebooks)}. We recommend a maximum of 10 be run at once.")
for notebook in notebooks:
input_notebook, output_notebook, this_notebook_suffix, parameters, tag = get_papermill_parameters(notebook, notebook_prefix)
if is_interactive_notebook(input_notebook):
print(f"Not running Notebook '{input_notebook}' because it's marked interactive-only.")
continue
if additional_imports:
input_notebook, output_notebook = add_additional_imports(input_notebook, output_notebook, now)
else:
output_notebook = output_notebook + tag + '_' + now + this_notebook_suffix
print(f"Running Notebook: '{input_notebook}'")
print(" - Parameters: " + str(parameters))
print(f"Saving Results to: '{output_notebook}'")
print("Link: " + re.sub(notebook_prefix, notebooks_base_url, output_notebook))
if not os.path.isfile(input_notebook):
print(f"ERROR! Notebook file does not exist: '{input_notebook}'")
else:
try:
if parameters is not None:
parameters.update({'input_notebook':input_notebook, 'output_notebook':output_notebook})
if parallel:
# trailing comma in args is in documentation for multiprocessing- it seems to matter
proc = multiprocessing.Process(target=run_papermill, args=(input_notebook, output_notebook, parameters,))
print("starting process")
proc.start()
procs.append(proc)
else:
run_papermill(input_notebook, output_notebook, parameters)
except Exception as ex:
print(ex)
print(f"ERROR! See full error in: '{output_notebook}'\n\n")
if additional_imports:
temp_dir = re.sub(os.path.basename(input_notebook), '', input_notebook)
if os.path.exists(temp_dir):
os.system(f"rm -rf '{temp_dir}'")
if procs:
print("joining")
for proc in procs:
proc.join()
if procs:
print("terminating")
for proc in procs:
print(proc.is_alive())
print(proc.exitcode)
proc.terminate()
print(f"Done: Processed all {len(notebooks)} notebooks.")
else:
print(f"Waiting until {run_hour_utc}:00:00 UTC to run.")
我正在使用 python==3.6.12,papermill==2.2.2
jupyter core : 4.7.0
jupyter-notebook : 5.5.0
ipython : 7.16.1
ipykernel : 5.3.4
jupyter client : 6.1.7
ipywidgets : 7.2.1
您是否尝试过使用 subprocess
模块?对于您来说,这似乎是一个更好的选择,而不是多处理。它允许您异步生成将 运行 并行的子进程,这可用于调用命令和程序,就像您使用 shell 一样。我发现编写 python 脚本而不是 bash 脚本真的很有用。
因此,您可以使用主笔记本 运行 作为与 subprocesses.run(your_function_with_papermill)
并行的独立子进程的其他笔记本。
我正在尝试 运行 通过从另一个笔记本启动 jupyter 笔记本来并行运行它们。我正在使用 papermill 来保存笔记本的输出。
在我的 scheduler.ipynb 中,我使用的是 multiprocessing
,这是 some people 取得的成功。我从一个基本笔记本创建流程,这似乎总是第一次工作 运行。我可以在 13 秒内用 sleep 10
运行 3 个笔记本。如果我有一个后续单元尝试 运行 完全相同的东西,它产生的进程(多个笔记本)将无限期挂起。我尝试添加代码以确保生成的进程具有退出代码并已完成,甚至在它们完成后调用终止 - 不走运,我的第二次尝试从未完成。
如果我这样做:
sean@server:~$ ps aux | grep ipython
root 2129 0.1 0.2 1117652 176904 ? Ssl 19:39 0:05 /opt/conda/anaconda/bin/python /opt/conda/anaconda/bin/ipython kernel -f /root/.local/share/jupyter/runtime/kernel-eee374ff-0760-4490-8ed0-db03fed84f0c.json
root 3418 0.1 0.2 1042076 173652 ? Ssl 19:42 0:03 /opt/conda/anaconda/bin/python /opt/conda/anaconda/bin/ipython kernel -f /root/.local/share/jupyter/runtime/kernel-3e2f09e8-969f-41c9-81cc-acd2ec4e3d54.json
root 4332 0.1 0.2 1042796 174896 ? Ssl 19:44 0:04 /opt/conda/anaconda/bin/python /opt/conda/anaconda/bin/ipython kernel -f /root/.local/share/jupyter/runtime/kernel-bbd4575c-109a-4fb3-b6ed-372beb27effd.json
root 17183 0.2 0.2 995344 145872 ? Ssl 20:26 0:02 /opt/conda/anaconda/bin/python /opt/conda/anaconda/bin/ipython kernel -f /root/.local/share/jupyter/runtime/kernel-27c48eb1-16b4-4442-9574-058283e48536.json
我看到似乎有 4 个 运行ning 内核(4 个进程)。当我查看 运行ning 笔记本时,我看到有 6 个 运行ning 笔记本。这似乎在 a few kernels can service multiple notebooks 的文档中得到了支持。 “一个内核进程可以同时连接到多个前端”
但是,我怀疑是因为 ipython 内核继续 运行,在未回收生成的进程的地方发生了一些不好的事情? Some say it’s not possible using multiprocessing. Others have described the same problem.
import re
import os
import multiprocessing
from os.path import isfile
from datetime import datetime
import papermill as pm
import nbformat
# avoid "RuntimeError: This event loop is already running"
# it seems papermill used to support this but it is now undocumented:
# papermill.execute_notebook(nest_asyncio=True)
import nest_asyncio
nest_asyncio.apply()
import company.config
# # Supporting Functions
# In[ ]:
def get_papermill_parameters(notebook,
notebook_prefix='/mnt/jupyter',
notebook_suffix='.ipynb'):
if isinstance(notebook, list):
notebook_path = notebook[0]
parameters = notebook[1]
tag = '_' + notebook[2] if notebook[2] is not None else None
else:
notebook_path = notebook
parameters = None
tag = ''
basename = os.path.basename(notebook_path)
dirpath = re.sub(basename + '$', '', notebook_path)
this_notebook_suffix = notebook_suffix if not re.search(notebook_suffix + '$', basename) else ''
input_notebook = notebook_prefix + notebook_path + this_notebook_suffix
scheduler_notebook_dir = notebook_prefix + dirpath + 'scheduler/'
if not os.path.exists(scheduler_notebook_dir):
os.makedirs(scheduler_notebook_dir)
output_notebook = scheduler_notebook_dir + basename
return input_notebook, output_notebook, this_notebook_suffix, parameters, tag
# In[ ]:
def add_additional_imports(input_notebook, output_notebook, current_datetime):
notebook_name = os.path.basename(output_notebook)
notebook_dir = re.sub(notebook_name, '', output_notebook)
temp_dir = notebook_dir + current_datetime + '/temp/'
results_dir = notebook_dir + current_datetime + '/'
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
if not os.path.exists(results_dir):
os.makedirs(results_dir)
updated_notebook = temp_dir + notebook_name
first_cell = nbformat.v4.new_code_cell("""
import import_ipynb
import sys
sys.path.append('/mnt/jupyter/lib')""")
metadata = {"kernelspec": {"display_name": "PySpark", "language": "python", "name": "pyspark"}}
existing_nb = nbformat.read(input_notebook, nbformat.current_nbformat)
cells = existing_nb.cells
cells.insert(0, first_cell)
new_nb = nbformat.v4.new_notebook(cells = cells, metadata = metadata)
nbformat.write(new_nb, updated_notebook, nbformat.current_nbformat)
output_notebook = results_dir + notebook_name
return updated_notebook, output_notebook
# In[ ]:
# define this function so it is easily passed to multiprocessing
def run_papermill(input_notebook, output_notebook, parameters):
pm.execute_notebook(input_notebook, output_notebook, parameters, log_output=True)
# # Run All of the Notebooks
# In[ ]:
def run(notebooks, run_hour_utc=10, scheduler=True, additional_imports=False,
parallel=False, notebook_prefix='/mnt/jupyter'):
"""
Run provided list of notebooks on a schedule or on demand.
Args:
notebooks (list): a list of notebooks to run
run_hour_utc (int): hour to run notebooks at
scheduler (boolean): when set to True (default value) notebooks will run at run_hour_utc.
when set to False notebooks will run on demand.
additional_imports (boolean): set to True if you need to add additional imports into your notebook
parallel (boolean): whether to run the notebooks in parallel
notebook_prefix (str): path to jupyter notebooks
"""
if not scheduler or datetime.now().hour == run_hour_utc: # Only run once a day on an hourly cron job.
now = datetime.today().strftime('%Y-%m-%d_%H%M%S')
procs = []
notebooks_base_url = company.config.cluster['resources']['daedalus']['notebook'] + '/notebooks'
if parallel and len(notebooks) > 10:
raise Exception("You are trying to run {len(notebooks)}. We recommend a maximum of 10 be run at once.")
for notebook in notebooks:
input_notebook, output_notebook, this_notebook_suffix, parameters, tag = get_papermill_parameters(notebook, notebook_prefix)
if is_interactive_notebook(input_notebook):
print(f"Not running Notebook '{input_notebook}' because it's marked interactive-only.")
continue
if additional_imports:
input_notebook, output_notebook = add_additional_imports(input_notebook, output_notebook, now)
else:
output_notebook = output_notebook + tag + '_' + now + this_notebook_suffix
print(f"Running Notebook: '{input_notebook}'")
print(" - Parameters: " + str(parameters))
print(f"Saving Results to: '{output_notebook}'")
print("Link: " + re.sub(notebook_prefix, notebooks_base_url, output_notebook))
if not os.path.isfile(input_notebook):
print(f"ERROR! Notebook file does not exist: '{input_notebook}'")
else:
try:
if parameters is not None:
parameters.update({'input_notebook':input_notebook, 'output_notebook':output_notebook})
if parallel:
# trailing comma in args is in documentation for multiprocessing- it seems to matter
proc = multiprocessing.Process(target=run_papermill, args=(input_notebook, output_notebook, parameters,))
print("starting process")
proc.start()
procs.append(proc)
else:
run_papermill(input_notebook, output_notebook, parameters)
except Exception as ex:
print(ex)
print(f"ERROR! See full error in: '{output_notebook}'\n\n")
if additional_imports:
temp_dir = re.sub(os.path.basename(input_notebook), '', input_notebook)
if os.path.exists(temp_dir):
os.system(f"rm -rf '{temp_dir}'")
if procs:
print("joining")
for proc in procs:
proc.join()
if procs:
print("terminating")
for proc in procs:
print(proc.is_alive())
print(proc.exitcode)
proc.terminate()
print(f"Done: Processed all {len(notebooks)} notebooks.")
else:
print(f"Waiting until {run_hour_utc}:00:00 UTC to run.")
我正在使用 python==3.6.12,papermill==2.2.2
jupyter core : 4.7.0
jupyter-notebook : 5.5.0
ipython : 7.16.1
ipykernel : 5.3.4
jupyter client : 6.1.7
ipywidgets : 7.2.1
您是否尝试过使用 subprocess
模块?对于您来说,这似乎是一个更好的选择,而不是多处理。它允许您异步生成将 运行 并行的子进程,这可用于调用命令和程序,就像您使用 shell 一样。我发现编写 python 脚本而不是 bash 脚本真的很有用。
因此,您可以使用主笔记本 运行 作为与 subprocesses.run(your_function_with_papermill)
并行的独立子进程的其他笔记本。