达斯克不启动工人
Dask not starting workers
我正在尝试使用 Dask 对 Dataframe 执行 groupby 操作。
下面的代码不起作用,但似乎如果我从另一个控制台初始化客户端,代码就可以工作,即使我在仪表板上看不到任何东西( http://localhost:8787/status ):我的意思是,有一个仪表板,但是所有的数字看起来都是空的。我在 macOS 上。
代码:
from datetime import datetime
import numpy as np
import os
from dask import dataframe as dd
from dask.distributed import Client
import pandas as pd
client = Client()
# open http://localhost:8787/status
csv_path = 'chicago-complete.monthly.2018-07-01-to-2018-07-31/data.csv'
dir_destination = 'data'
df = dd.read_csv(csv_path,
dtype = {
'timestamp': str,
'node_id': str,
'subsystem': str,
'sensor': str,
'parameter': str,
'value_raw': str,
'value_hrf': str,
},
parse_dates=['timestamp'],
date_parser=lambda x: pd.datetime.strptime(x, '%Y/%m/%d %H:%M:%S')
)
#%%
if not os.path.exists(dir_destination):
os.makedirs(dir_destination)
def create_node_csv(df_node):
# test function
return len(df_node)
res = df.groupby('node_id').apply(create_node_csv, meta=int)
csv文件只是简单地由字符串列组成。我的目标是对列中包含特定值的所有行进行分组,而不是使用 create_node_csv(df_node) 将它们保存为单独的文件(即使现在是一个虚拟函数)。任何其他方式都值得赞赏,但我想了解这里发生了什么。
当我运行它时,控制台多次打印以下错误:
tornado.application - 错误 - 收益列表中存在多个异常
追溯(最近一次通话最后一次):
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 883, in callback
result_list.append(f.result())
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
yielded = self.gen.throw(*exc_info)
File "/anaconda3/lib/python3.7/site-packages/distributed/deploy/local.py", line 208, in _start_worker
yield w._start()
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
yielded = self.gen.throw(*exc_info)
File "/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 157, in _start
response = yield self.instantiate()
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
yielded = self.gen.throw(*exc_info)
File "/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 226, in instantiate
self.process.start()
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
yielded = self.gen.throw(*exc_info)
File "/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 370, in start
yield self.process.start()
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/anaconda3/lib/python3.7/site-packages/distributed/process.py", line 35, in _call_and_set_future
res = func(*args, **kwargs)
File "/anaconda3/lib/python3.7/site-packages/distributed/process.py", line 184, in _start
process.start()
File "/anaconda3/lib/python3.7/multiprocessing/process.py", line 112, in start
self._popen = self._Popen(self)
File "/anaconda3/lib/python3.7/multiprocessing/context.py", line 291, in _Popen
return Popen(process_obj)
File "/anaconda3/lib/python3.7/multiprocessing/popen_forkserver.py", line 35, in __init__
super().__init__(process_obj)
File "/anaconda3/lib/python3.7/multiprocessing/popen_fork.py", line 20, in __init__
self._launch(process_obj)
File "/anaconda3/lib/python3.7/multiprocessing/popen_forkserver.py", line 42, in _launch
prep_data = spawn.get_preparation_data(process_obj._name)
File "/anaconda3/lib/python3.7/multiprocessing/spawn.py", line 143, in get_preparation_data
_check_not_importing_main()
File "/anaconda3/lib/python3.7/multiprocessing/spawn.py", line 136, in _check_not_importing_main
is not going to be frozen to produce an executable.''')
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
并且:
distributed.nanny - WARNING - Worker process 1844 exited with status 1
distributed.nanny - WARNING - Restarting worker
并且:
Traceback (most recent call last):
File "/anaconda3/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
send_bytes(obj)
File "/anaconda3/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
self._send_bytes(m[offset:offset + size])
File "/anaconda3/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
self._send(header + buf)
File "/anaconda3/lib/python3.7/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
tornado.application - ERROR - Multiple exceptions in yield list
Traceback (most recent call last):
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 883, in callback
result_list.append(f.result())
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1147, in run
yielded = self.gen.send(value)
File "/anaconda3/lib/python3.7/site-packages/distributed/deploy/local.py", line 217, in _start_worker
raise gen.TimeoutError("Worker failed to start")
tornado.util.TimeoutError: Worker failed to start
tornado.application - ERROR - Multiple exceptions in yield list
Traceback (most recent call last):
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 883, in callback
result_list.append(f.result())
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1147, in run
yielded = self.gen.send(value)
File "/anaconda3/lib/python3.7/site-packages/distributed/deploy/local.py", line 217, in _start_worker
编辑:
根据答案:
- 如果我再次 运行 程序,如何防止创建新客户端?
- 我怎样才能做到以下几点?
def create_node_csv(df_node):
return len(df_node)
是returns我下面的错误,是不是跟meta参数有关?
ValueError: cannot reindex from a duplicate axis
当您 运行 脚本时,Client()
会导致产生新的 Dask worker,它们还会从原始主进程中获取变量的副本。在某些情况下,这涉及在每个 worker 中重新导入脚本,当然,每个 worker 都会尝试创建一个 Client
和一组新的进程。
最好的答案是使用函数,并保护主执行,就像通常在进程中处理任何 运行ning 一样。以下是一种方法,无需更改单脚本结构:
from datetime import datetime
import numpy as np
import os
from dask import dataframe as dd
from dask.distributed import Client
import pandas as pd
csv_path = 'chicago-complete.monthly.2018-07-01-to-2018-07-31/data.csv'
dir_destination = 'data'
def run():
client = Client()
df = dd.read_csv(csv_path, ...)
if not os.path.exists(dir_destination):
os.makedirs(dir_destination)
def create_node_csv(df_node):
# test function
return len(df_node)
res = df.groupby('node_id').apply(create_node_csv, meta=int)
print(res.compute())
if __name__ == "__main__":
run()
How do I prevent the creation of a new Client if I run the program again?
在对 Client()
的调用中,您可以包含现有集群的地址,如果您知道那是什么的话。此外,某些特定类型的部署(有几种)可能具有 "current cluster".
的概念
我正在尝试使用 Dask 对 Dataframe 执行 groupby 操作。 下面的代码不起作用,但似乎如果我从另一个控制台初始化客户端,代码就可以工作,即使我在仪表板上看不到任何东西( http://localhost:8787/status ):我的意思是,有一个仪表板,但是所有的数字看起来都是空的。我在 macOS 上。 代码:
from datetime import datetime
import numpy as np
import os
from dask import dataframe as dd
from dask.distributed import Client
import pandas as pd
client = Client()
# open http://localhost:8787/status
csv_path = 'chicago-complete.monthly.2018-07-01-to-2018-07-31/data.csv'
dir_destination = 'data'
df = dd.read_csv(csv_path,
dtype = {
'timestamp': str,
'node_id': str,
'subsystem': str,
'sensor': str,
'parameter': str,
'value_raw': str,
'value_hrf': str,
},
parse_dates=['timestamp'],
date_parser=lambda x: pd.datetime.strptime(x, '%Y/%m/%d %H:%M:%S')
)
#%%
if not os.path.exists(dir_destination):
os.makedirs(dir_destination)
def create_node_csv(df_node):
# test function
return len(df_node)
res = df.groupby('node_id').apply(create_node_csv, meta=int)
csv文件只是简单地由字符串列组成。我的目标是对列中包含特定值的所有行进行分组,而不是使用 create_node_csv(df_node) 将它们保存为单独的文件(即使现在是一个虚拟函数)。任何其他方式都值得赞赏,但我想了解这里发生了什么。
当我运行它时,控制台多次打印以下错误: tornado.application - 错误 - 收益列表中存在多个异常 追溯(最近一次通话最后一次):
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 883, in callback
result_list.append(f.result())
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
yielded = self.gen.throw(*exc_info)
File "/anaconda3/lib/python3.7/site-packages/distributed/deploy/local.py", line 208, in _start_worker
yield w._start()
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
yielded = self.gen.throw(*exc_info)
File "/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 157, in _start
response = yield self.instantiate()
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
yielded = self.gen.throw(*exc_info)
File "/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 226, in instantiate
self.process.start()
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
yielded = self.gen.throw(*exc_info)
File "/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 370, in start
yield self.process.start()
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/anaconda3/lib/python3.7/site-packages/distributed/process.py", line 35, in _call_and_set_future
res = func(*args, **kwargs)
File "/anaconda3/lib/python3.7/site-packages/distributed/process.py", line 184, in _start
process.start()
File "/anaconda3/lib/python3.7/multiprocessing/process.py", line 112, in start
self._popen = self._Popen(self)
File "/anaconda3/lib/python3.7/multiprocessing/context.py", line 291, in _Popen
return Popen(process_obj)
File "/anaconda3/lib/python3.7/multiprocessing/popen_forkserver.py", line 35, in __init__
super().__init__(process_obj)
File "/anaconda3/lib/python3.7/multiprocessing/popen_fork.py", line 20, in __init__
self._launch(process_obj)
File "/anaconda3/lib/python3.7/multiprocessing/popen_forkserver.py", line 42, in _launch
prep_data = spawn.get_preparation_data(process_obj._name)
File "/anaconda3/lib/python3.7/multiprocessing/spawn.py", line 143, in get_preparation_data
_check_not_importing_main()
File "/anaconda3/lib/python3.7/multiprocessing/spawn.py", line 136, in _check_not_importing_main
is not going to be frozen to produce an executable.''')
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
并且:
distributed.nanny - WARNING - Worker process 1844 exited with status 1
distributed.nanny - WARNING - Restarting worker
并且:
Traceback (most recent call last):
File "/anaconda3/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
send_bytes(obj)
File "/anaconda3/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
self._send_bytes(m[offset:offset + size])
File "/anaconda3/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
self._send(header + buf)
File "/anaconda3/lib/python3.7/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
tornado.application - ERROR - Multiple exceptions in yield list
Traceback (most recent call last):
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 883, in callback
result_list.append(f.result())
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1147, in run
yielded = self.gen.send(value)
File "/anaconda3/lib/python3.7/site-packages/distributed/deploy/local.py", line 217, in _start_worker
raise gen.TimeoutError("Worker failed to start")
tornado.util.TimeoutError: Worker failed to start
tornado.application - ERROR - Multiple exceptions in yield list
Traceback (most recent call last):
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 883, in callback
result_list.append(f.result())
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1147, in run
yielded = self.gen.send(value)
File "/anaconda3/lib/python3.7/site-packages/distributed/deploy/local.py", line 217, in _start_worker
编辑: 根据答案: - 如果我再次 运行 程序,如何防止创建新客户端? - 我怎样才能做到以下几点?
def create_node_csv(df_node):
return len(df_node)
是returns我下面的错误,是不是跟meta参数有关?
ValueError: cannot reindex from a duplicate axis
当您 运行 脚本时,Client()
会导致产生新的 Dask worker,它们还会从原始主进程中获取变量的副本。在某些情况下,这涉及在每个 worker 中重新导入脚本,当然,每个 worker 都会尝试创建一个 Client
和一组新的进程。
最好的答案是使用函数,并保护主执行,就像通常在进程中处理任何 运行ning 一样。以下是一种方法,无需更改单脚本结构:
from datetime import datetime
import numpy as np
import os
from dask import dataframe as dd
from dask.distributed import Client
import pandas as pd
csv_path = 'chicago-complete.monthly.2018-07-01-to-2018-07-31/data.csv'
dir_destination = 'data'
def run():
client = Client()
df = dd.read_csv(csv_path, ...)
if not os.path.exists(dir_destination):
os.makedirs(dir_destination)
def create_node_csv(df_node):
# test function
return len(df_node)
res = df.groupby('node_id').apply(create_node_csv, meta=int)
print(res.compute())
if __name__ == "__main__":
run()
How do I prevent the creation of a new Client if I run the program again?
在对 Client()
的调用中,您可以包含现有集群的地址,如果您知道那是什么的话。此外,某些特定类型的部署(有几种)可能具有 "current cluster".