Dask Bag.to_textfiles 适用于单个分区但不适用于多个
Dask Bag.to_textfiles works with single partition but not multiple
非常简短..这是一个错误还是我遗漏了什么?
tmp_j
是单品6分区的包。但是,我得到
对较大的袋子有类似的反应。
这个特别的包是用:
>>> tmp_j = jnode_b.filter(lambda r: (r['node']['attrib']['uid'] == '8909') &
(r['node']['attrib']['version'] == '1')).pluck('node').pluck('attire')
看起来像:
>>> tmp_j.compute()
[{'changeset': '39455176',
'id': '4197394169',
'lat': '53.4803608',
'lon': '-113.4955328',
'timestamp': '2016-05-20T16:43:02Z',
'uid': '8909',
'user': 'mvexel',
'version': '1'}]
再次感谢..
>>> tmp_j.repartition(1).map(json.dumps).to_textfiles('tmpA*.json')
工作正常,(写入文件),但是
>>> tmp_j.map(json.dumps).to_textfiles('tmpA*.json')
给出
StopIteration Traceback (most recent call last)
<ipython-input-28-a77a33e2ff26> in <module>()
----> 1 tmp_j.map(json.dumps).to_textfiles('tmp*.json')
/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/bag/core.py in to_textfiles(self, path, name_function, compression, encoding, compute)
469 def to_textfiles(self, path, name_function=str, compression='infer',
470 encoding=system_encoding, compute=True):
--> 471 return to_textfiles(self, path, name_function, compression, encoding, compute)
472
473 def fold(self, binop, combine=None, initial=no_default, split_every=None):
/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/bag/core.py in to_textfiles(b, path, name_function, compression, encoding, compute)
167 result = Bag(merge(b.dask, dsk), name, b.npartitions)
168 if compute:
--> 169 result.compute()
170 else:
171 return result
/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/base.py in compute(self, **kwargs)
35
36 def compute(self, **kwargs):
---> 37 return compute(self, **kwargs)[0]
38
39 @classmethod
/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/base.py in compute(*args, **kwargs)
108 for opt, val in groups.items()])
109 keys = [var._keys() for var in variables]
--> 110 results = get(dsk, keys, **kwargs)
111
112 results_iter = iter(results)
/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/multiprocessing.py in get(dsk, keys, optimizations, num_workers, func_loads, func_dumps, **kwargs)
76 # Run
77 result = get_async(apply_async, len(pool._pool), dsk3, keys,
---> 78 queue=queue, get_id=_process_get_id, **kwargs)
79 finally:
80 if cleanup:
/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in get_async(apply_async, num_workers, dsk, result, cache, queue, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, **kwargs)
486 _execute_task(task, data) # Re-execute locally
487 else:
--> 488 raise(remote_exception(res, tb))
489 state['cache'][key] = res
490 finish_task(dsk, key, state, results, keyorder.get)
StopIteration:
Traceback
---------
File "/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py", line 267, in execute_task
result = _execute_task(task, data)
File "/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py", line 249, in _execute_task
return func(*args2)
File "/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/bag/core.py", line 1024, in write
firstline = next(data)
注意事项:是
>>> tmp_b = db.from_sequence(tmp_j,partition_size=3)
>>> tmp_b.map(json.dumps).to_textfiles('tmp*.json')
工作正常(但同样,tmp_b.npartitions == 1
)。
再次感谢您的见解 - 我确实查看了来源,但后来意识到我的 smart/lazy 比率太低了。
当我确信我掌握了这个时,我会提交文档。
这是一个真正的错误,现已在 master 中解决
In [1]: import dask.bag as db
In [2]: db.range(5, npartitions=5).filter(lambda x: x == 1).map(str).to_textfiles('*.txt')
In [3]: ls *.txt
0.txt 1.txt 2.txt 3.txt 4.txt C:\nppdf32Log\debuglog.txt foo.txt
非常简短..这是一个错误还是我遗漏了什么?
tmp_j
是单品6分区的包。但是,我得到
对较大的袋子有类似的反应。
这个特别的包是用:
>>> tmp_j = jnode_b.filter(lambda r: (r['node']['attrib']['uid'] == '8909') &
(r['node']['attrib']['version'] == '1')).pluck('node').pluck('attire')
看起来像:
>>> tmp_j.compute()
[{'changeset': '39455176',
'id': '4197394169',
'lat': '53.4803608',
'lon': '-113.4955328',
'timestamp': '2016-05-20T16:43:02Z',
'uid': '8909',
'user': 'mvexel',
'version': '1'}]
再次感谢..
>>> tmp_j.repartition(1).map(json.dumps).to_textfiles('tmpA*.json')
工作正常,(写入文件),但是
>>> tmp_j.map(json.dumps).to_textfiles('tmpA*.json')
给出
StopIteration Traceback (most recent call last)
<ipython-input-28-a77a33e2ff26> in <module>()
----> 1 tmp_j.map(json.dumps).to_textfiles('tmp*.json')
/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/bag/core.py in to_textfiles(self, path, name_function, compression, encoding, compute)
469 def to_textfiles(self, path, name_function=str, compression='infer',
470 encoding=system_encoding, compute=True):
--> 471 return to_textfiles(self, path, name_function, compression, encoding, compute)
472
473 def fold(self, binop, combine=None, initial=no_default, split_every=None):
/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/bag/core.py in to_textfiles(b, path, name_function, compression, encoding, compute)
167 result = Bag(merge(b.dask, dsk), name, b.npartitions)
168 if compute:
--> 169 result.compute()
170 else:
171 return result
/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/base.py in compute(self, **kwargs)
35
36 def compute(self, **kwargs):
---> 37 return compute(self, **kwargs)[0]
38
39 @classmethod
/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/base.py in compute(*args, **kwargs)
108 for opt, val in groups.items()])
109 keys = [var._keys() for var in variables]
--> 110 results = get(dsk, keys, **kwargs)
111
112 results_iter = iter(results)
/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/multiprocessing.py in get(dsk, keys, optimizations, num_workers, func_loads, func_dumps, **kwargs)
76 # Run
77 result = get_async(apply_async, len(pool._pool), dsk3, keys,
---> 78 queue=queue, get_id=_process_get_id, **kwargs)
79 finally:
80 if cleanup:
/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in get_async(apply_async, num_workers, dsk, result, cache, queue, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, **kwargs)
486 _execute_task(task, data) # Re-execute locally
487 else:
--> 488 raise(remote_exception(res, tb))
489 state['cache'][key] = res
490 finish_task(dsk, key, state, results, keyorder.get)
StopIteration:
Traceback
---------
File "/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py", line 267, in execute_task
result = _execute_task(task, data)
File "/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py", line 249, in _execute_task
return func(*args2)
File "/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/bag/core.py", line 1024, in write
firstline = next(data)
注意事项:是
>>> tmp_b = db.from_sequence(tmp_j,partition_size=3)
>>> tmp_b.map(json.dumps).to_textfiles('tmp*.json')
工作正常(但同样,tmp_b.npartitions == 1
)。
再次感谢您的见解 - 我确实查看了来源,但后来意识到我的 smart/lazy 比率太低了。
当我确信我掌握了这个时,我会提交文档。
这是一个真正的错误,现已在 master 中解决
In [1]: import dask.bag as db
In [2]: db.range(5, npartitions=5).filter(lambda x: x == 1).map(str).to_textfiles('*.txt')
In [3]: ls *.txt
0.txt 1.txt 2.txt 3.txt 4.txt C:\nppdf32Log\debuglog.txt foo.txt