从命令提示符调用时 Dask 挂起

Dask hanging when called from command prompt

我有一个程序在 Jupyter Notebook 单元格中 运行 时按预期运行 运行,但在放入 python 文件并调用时为 failing/hanging从 Jupyter Notebook 或命令行。

这里是测试代码:

    import pandas as pd
    df = pd.DataFrame({'col1':[1,2,3,4],'col2':[5,5,5,5]})

    def B(df):
        df['col3'] = df['col1'] + 100
        return df

    def A(df):
        from dask import delayed, compute
        import numpy as np
        from dask.distributed import Client, LocalCluster

        if __name__ == '__main__':
            cluster = LocalCluster(n_workers = 2)
            client = Client(cluster)

            results_dfs = []
            df_split = np.array_split(df, 2)

            for split in df_split:
                results_dfs.append(delayed(B)(split))

            result = delayed(pd.concat)(results_dfs)
            result = result.compute()

            client.close()
            cluster.close()
    
        return result
    
    result = A(df)
    result

这是我在运行输入代码时收到的预期结果:

然而,当我将这段代码保存在一个文件中,然后使用 %运行 -i "path-to-code\test.py" 从 Jupyter Notebook 调用它时,单元格 运行s 永远。

当我尝试从命令提示符调用相同的程序文件时,出现以下错误:

Traceback (most recent call last):  
  File "<string>", line 1, in <module>  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\multiprocessing\spawn.py", line 116, in spawn_main  
    exitcode = _main(fd, parent_sentinel)  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\multiprocessing\spawn.py", line 125, in _main  
    prepare(preparation_data)  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\multiprocessing\spawn.py", line 236, in prepare  
    _fixup_main_from_path(data['init_main_from_path'])  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\multiprocessing\spawn.py", line 287, in _fixup_main_from_path  
    main_content = runpy.run_path(main_path,  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\runpy.py", line 268, in run_path  
    return _run_module_code(code, init_globals, run_name,  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\runpy.py", line 97, in _run_module_code  
    _run_code(code, mod_globals, init_globals,  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\runpy.py", line 87, in _run_code  
    exec(code, run_globals)  
  File "C:\User\Desktop\Company Stuff\Code\test.py", line 31, in <module>  
    result = A(df)  
  File "C:\User\Desktop\Company Stuff\Code\test.py", line 29, in A  
    return result  
UnboundLocalError: local variable 'result' referenced before assignment  
Task exception was never retrieved  
future: <Task finished name='Task-12' coro=<_wrap_awaitable() done, defined at C:\User\AppData\Local\Programs\Python\Python39\lib\asyncio\tasks.py:678> exception=ImportError("cannot import name 'Popen' from partially initialized module 'multiprocessing.popen_spawn_win32' (most likely due to a circular import) (C:\User\AppData\Local\Programs\Python\Python39\lib\multiprocessing\popen_spawn_win32.py)")>  
Traceback (most recent call last):  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\asyncio\tasks.py", line 685, in _wrap_awaitable  
    return (yield from awaitable.__await__())  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\site-packages\distributed\core.py", line 284, in _  
    await self.start()  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\site-packages\distributed\nanny.py", line 295, in start  
    response = await self.instantiate()  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\site-packages\distributed\nanny.py", line 378, in instantiate  
    result = await self.process.start()  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\site-packages\distributed\nanny.py", line 575, in start  
    await self.process.start()  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\site-packages\distributed\process.py", line 33, in _call_and_set_future  
    res = func(*args, **kwargs)  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\site-packages\distributed\process.py", line 203, in _start  
    process.start()  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\multiprocessing\process.py", line 121, in start  
    self._popen = self._Popen(self)  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\multiprocessing\context.py", line 224, in _Popen  
    return _default_context.get_context().Process._Popen(process_obj)  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\multiprocessing\context.py", line 326, in _Popen  
    from .popen_spawn_win32 import Popen  
ImportError: cannot import name 'Popen' from partially initialized module   'multiprocessing.popen_spawn_win32' (most likely due to a circular import) (C:\User\AppData\Local\Programs\Python\Python39\lib\multiprocessing\popen_spawn_win32.py)  
tornado.application - ERROR - Exception in callback <bound method Nanny.memory_monitor of <Nanny: None, threads: 2>>  
Traceback (most recent call last):  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\site-packages\tornado\ioloop.py", line 905, in _run  
    return self.callback()  
  File "C:\User\AppData\Local\Programs\Python\Python39\lib\site-packages\distributed\nanny.py", line 414, in memory_monitor  
    process = self.process.process  
AttributeError: 'NoneType' object has no attribute 'process'

日志的最后 7 行然后无限重复,直到我中断程序。为什么这个程序放在它自己的文件中时不 运行ning?

我是运行宁windows10,python3.9,这里是环境包python:

Package           Version
----------------- --------
asgiref           3.3.0
astroid           2.4.2
bokeh             2.2.3
click             7.1.2
cloudpickle       1.6.0
colorama          0.4.4
dask              2021.2.0
distributed       2021.2.0
Django            3.1.4
et-xmlfile        1.0.1
fsspec            0.8.5
HeapDict          1.0.1
isort             5.6.4
jdcal             1.4.1
Jinja2            2.11.2
lazy-object-proxy 1.4.3
locket            0.2.0
MarkupSafe        1.1.1
mccabe            0.6.1
msgpack           1.0.2
numpy             1.19.3
openpyxl          3.0.5
packaging         20.8
pandas            1.1.5
partd             1.1.0
Pillow            8.0.1
pip               21.0.1
psutil            5.8.0
pylint            2.6.0
pyparsing         2.4.7
python-dateutil   2.8.1
pytz              2020.4
PyYAML            5.3.1
reportlab         3.5.59
setuptools        49.2.1
six               1.15.0
sortedcontainers  2.3.0
sqlparse          0.4.1
tblib             1.7.0
toml              0.10.2
toolz             0.11.1
tornado           6.1
typing-extensions 3.7.4.3
wrapt             1.12.1
xlrd              2.0.1
zict              2.0.0

我认为这只是一个缩进错误,所以更正您的脚本,它起作用了。您可能还想使用上下文管理器来确保 client/cluster 正确关闭:

import pandas as pd
df = pd.DataFrame({'col1':[1,2,3,4],'col2':[5,5,5,5]})

def B(df):
    df['col3'] = df['col1'] + 100
    return df

def A(df):
    from dask import delayed, compute
    import numpy as np
    from dask.distributed import Client, LocalCluster

    if __name__ == '__main__':
        with LocalCluster(n_workers = 2) as cluster, Client(cluster) as client:
            results_dfs = []
            df_split = np.array_split(df, 2)

            for split in df_split:
                results_dfs.append(delayed(B)(split))

            result = delayed(pd.concat)(results_dfs)
            result = result.compute()

        return result

result = A(df)
if __name__ == '__main__':
    print(result)