你如何使用 dask + distributed 来处理 NFS 文件?
How do you use dask + distributed for NFS files?
从 Matthew Rocklin's post 使用 Dask 处理分布式数据帧,我试图在我的集群中分发一些汇总统计计算。使用 dcluster ...
设置集群工作正常。在笔记本里,
import dask.dataframe as dd
from distributed import Executor, progress
e = Executor('...:8786')
df = dd.read_csv(...)
我正在读取的文件位于所有工作计算机都可以访问的 NFS 装载上。在这一点上,我可以查看 df.head()
示例,一切看起来都是正确的。来自博客post,我觉得我应该可以做到:
df_future = e.persist(df)
progress(df_future)
# ... wait for everything to load ...
df_future.head()
但这是一个错误:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-26-8d59adace8bf> in <module>()
----> 1 fraudf.head()
/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/dataframe/core.py in head(self, n, compute)
358
359 if compute:
--> 360 result = result.compute()
361 return result
362
/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/base.py in compute(self, **kwargs)
35
36 def compute(self, **kwargs):
---> 37 return compute(self, **kwargs)[0]
38
39 @classmethod
/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/base.py in compute(*args, **kwargs)
108 for opt, val in groups.items()])
109 keys = [var._keys() for var in variables]
--> 110 results = get(dsk, keys, **kwargs)
111
112 results_iter = iter(results)
/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, **kwargs)
55 results = get_async(pool.apply_async, len(pool._pool), dsk, result,
56 cache=cache, queue=queue, get_id=_thread_get_id,
---> 57 **kwargs)
58
59 return results
/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/async.py in get_async(apply_async, num_workers, dsk, result, cache, queue, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, **kwargs)
479 _execute_task(task, data) # Re-execute locally
480 else:
--> 481 raise(remote_exception(res, tb))
482 state['cache'][key] = res
483 finish_task(dsk, key, state, results, keyorder.get)
AttributeError: 'Future' object has no attribute 'head'
Traceback
---------
File "/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/async.py", line 264, in execute_task
result = _execute_task(task, data)
File "/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/async.py", line 246, in _execute_task
return func(*args2)
File "/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/dataframe/core.py", line 354, in <lambda>
dsk = {(name, 0): (lambda x, n: x.head(n=n), (self._name, 0), n)}
当数据帧来自普通文件系统而不是 HDFS 时,分发数据帧的正确方法是什么?
Dask 正在尝试使用单机调度程序,如果您使用普通的 dask 库创建数据框,这是默认设置。使用以下行将默认值切换为使用您的集群:
import dask
dask.set_options(get=e.get)
从 Matthew Rocklin's post 使用 Dask 处理分布式数据帧,我试图在我的集群中分发一些汇总统计计算。使用 dcluster ...
设置集群工作正常。在笔记本里,
import dask.dataframe as dd
from distributed import Executor, progress
e = Executor('...:8786')
df = dd.read_csv(...)
我正在读取的文件位于所有工作计算机都可以访问的 NFS 装载上。在这一点上,我可以查看 df.head()
示例,一切看起来都是正确的。来自博客post,我觉得我应该可以做到:
df_future = e.persist(df)
progress(df_future)
# ... wait for everything to load ...
df_future.head()
但这是一个错误:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-26-8d59adace8bf> in <module>()
----> 1 fraudf.head()
/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/dataframe/core.py in head(self, n, compute)
358
359 if compute:
--> 360 result = result.compute()
361 return result
362
/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/base.py in compute(self, **kwargs)
35
36 def compute(self, **kwargs):
---> 37 return compute(self, **kwargs)[0]
38
39 @classmethod
/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/base.py in compute(*args, **kwargs)
108 for opt, val in groups.items()])
109 keys = [var._keys() for var in variables]
--> 110 results = get(dsk, keys, **kwargs)
111
112 results_iter = iter(results)
/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, **kwargs)
55 results = get_async(pool.apply_async, len(pool._pool), dsk, result,
56 cache=cache, queue=queue, get_id=_thread_get_id,
---> 57 **kwargs)
58
59 return results
/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/async.py in get_async(apply_async, num_workers, dsk, result, cache, queue, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, **kwargs)
479 _execute_task(task, data) # Re-execute locally
480 else:
--> 481 raise(remote_exception(res, tb))
482 state['cache'][key] = res
483 finish_task(dsk, key, state, results, keyorder.get)
AttributeError: 'Future' object has no attribute 'head'
Traceback
---------
File "/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/async.py", line 264, in execute_task
result = _execute_task(task, data)
File "/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/async.py", line 246, in _execute_task
return func(*args2)
File "/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/dataframe/core.py", line 354, in <lambda>
dsk = {(name, 0): (lambda x, n: x.head(n=n), (self._name, 0), n)}
当数据帧来自普通文件系统而不是 HDFS 时,分发数据帧的正确方法是什么?
Dask 正在尝试使用单机调度程序,如果您使用普通的 dask 库创建数据框,这是默认设置。使用以下行将默认值切换为使用您的集群:
import dask
dask.set_options(get=e.get)