在 Dask 计算方法中记录类型错误
Logging Type Error In Dask Compute Method
这里是示例 python 代码,名为 dask_multithread_demo.py
import dask.bag as db
import logging
log = logging.getLogger(__name__)
def dask_function(input_list):
db.from_sequence(input_list) \
.map(lambda x: log.info(f"showing x: {x}")) \
.compute()
def main():
dask.config.set(scheduler='single-threaded')
dask_function(["abc"])
dask.config.set(scheduler='multiprocessing')
dask_function(["abc"])
当我 运行 main 方法并第一次调用 dask_function 时,我没有得到任何异常
当我到达 dask_function 的第二次调用时,我得到以下异常
File "dask_multithread_demo.py", line 18, in main
dask_function(["abc"])
File "dask_multithread_demo.py", line 10, in dask_function
.map(lambda x: log.info(f"showing x: {x}")) \
File "lib/python3.6/site-packages/dask/base.py", line 166, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "lib/python3.6/site-packages/dask/base.py", line 437, in compute
results = schedule(dsk, keys, **kwargs)
File "lib/python3.6/site-packages/dask/multiprocessing.py", line 222, in get
**kwargs
File "lib/python3.6/site-packages/dask/local.py", line 489, in get_async
raise_exception(exc, tb)
File "lib/python3.6/site-packages/dask/local.py", line 318, in reraise
raise exc.with_traceback(tb)
File "lib/python3.6/site-packages/dask/local.py", line 224, in execute_task
task, data = loads(task_info)
TypeError: get_logger() missing 1 required positional argument: 'name'
get_logger() missing 1 required positional argument: 'name'
这是我的问题:如何在调度程序设置为“多处理”的情况下登录 dask_function 而不会发生类型错误异常?
我认为当您传递 multiprocessing
时,您不能像在 single-threaded
上下文中那样在进程之间共享变量。参见:https://docs.python.org/3/library/multiprocessing.html#exchanging-objects-between-processes
您可能想查看 Dask 文档中的调试页面,了解有关调试的一般想法:https://docs.dask.org/en/stable/how-to/debug.html#debug
更具体地说是关于日志记录的部分:https://docs.dask.org/en/stable/how-to/debug.html#logs
我个人发现 rerun_exceptions_locally
参数对于快速调试 Dask worker 上发生的错误很有价值:https://docs.dask.org/en/stable/how-to/debug.html#rerun-failed-task-locally
简而言之;日志记录(和一般的调试)在分布式环境中并不是一件微不足道的事情。异常通常最终会杀死工作进程,因此堆栈跟踪或日志可能无法返回给您。
这里是示例 python 代码,名为 dask_multithread_demo.py
import dask.bag as db
import logging
log = logging.getLogger(__name__)
def dask_function(input_list):
db.from_sequence(input_list) \
.map(lambda x: log.info(f"showing x: {x}")) \
.compute()
def main():
dask.config.set(scheduler='single-threaded')
dask_function(["abc"])
dask.config.set(scheduler='multiprocessing')
dask_function(["abc"])
当我 运行 main 方法并第一次调用 dask_function 时,我没有得到任何异常
当我到达 dask_function 的第二次调用时,我得到以下异常
File "dask_multithread_demo.py", line 18, in main
dask_function(["abc"])
File "dask_multithread_demo.py", line 10, in dask_function
.map(lambda x: log.info(f"showing x: {x}")) \
File "lib/python3.6/site-packages/dask/base.py", line 166, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "lib/python3.6/site-packages/dask/base.py", line 437, in compute
results = schedule(dsk, keys, **kwargs)
File "lib/python3.6/site-packages/dask/multiprocessing.py", line 222, in get
**kwargs
File "lib/python3.6/site-packages/dask/local.py", line 489, in get_async
raise_exception(exc, tb)
File "lib/python3.6/site-packages/dask/local.py", line 318, in reraise
raise exc.with_traceback(tb)
File "lib/python3.6/site-packages/dask/local.py", line 224, in execute_task
task, data = loads(task_info)
TypeError: get_logger() missing 1 required positional argument: 'name'
get_logger() missing 1 required positional argument: 'name'
这是我的问题:如何在调度程序设置为“多处理”的情况下登录 dask_function 而不会发生类型错误异常?
我认为当您传递 multiprocessing
时,您不能像在 single-threaded
上下文中那样在进程之间共享变量。参见:https://docs.python.org/3/library/multiprocessing.html#exchanging-objects-between-processes
您可能想查看 Dask 文档中的调试页面,了解有关调试的一般想法:https://docs.dask.org/en/stable/how-to/debug.html#debug
更具体地说是关于日志记录的部分:https://docs.dask.org/en/stable/how-to/debug.html#logs
我个人发现 rerun_exceptions_locally
参数对于快速调试 Dask worker 上发生的错误很有价值:https://docs.dask.org/en/stable/how-to/debug.html#rerun-failed-task-locally
简而言之;日志记录(和一般的调试)在分布式环境中并不是一件微不足道的事情。异常通常最终会杀死工作进程,因此堆栈跟踪或日志可能无法返回给您。