python 中带有 joblib 的并行循环抛出奇怪的错误
Parallel loop in python with joblib throws weird error
我正在尝试 运行 python
中的一个非常简单的并行循环
from joblib import Parallel, delayed
my_array = np.zeros((2,3))
def foo(array,x):
for i in [0,1,2]:
array[x][i]=25
print(array, id(array), 'arrays in workers')
def main(array):
print(id(array), 'Original array')
inputs = [0,1]
if __name__ == '__main__':
Parallel(n_jobs=8, verbose = 0)((foo)(array,i) for i in inputs)
# print(my_array, id(array), 'Original array')
main(my_array)
最后确实改变了数组,但我收到以下错误
---------------------------------------------------------------------------
_RemoteTraceback Traceback (most recent call last)
_RemoteTraceback:
"""
Traceback (most recent call last):
File "/home/john/.local/lib/python3.8/site-packages/joblib/externals/loky/process_executor.py", line 431, in _process_worker
r = call_item()
File "/home/john/.local/lib/python3.8/site-packages/joblib/externals/loky/process_executor.py", line 285, in __call__
return self.fn(*self.args, **self.kwargs)
File "/home/john/.local/lib/python3.8/site-packages/joblib/_parallel_backends.py", line 595, in __call__
return self.func(*args, **kwargs)
File "/home/john/.local/lib/python3.8/site-packages/joblib/parallel.py", line 252, in __call__
return [func(*args, **kwargs)
File "/home/john/.local/lib/python3.8/site-packages/joblib/parallel.py", line 253, in <listcomp>
for func, args, kwargs in self.items]
TypeError: cannot unpack non-iterable NoneType object
"""
The above exception was the direct cause of the following exception:
TypeError Traceback (most recent call last)
<ipython-input-74-e1b992b5617f> in <module>
15 # print(my_array, id(array), 'Original array')
16
---> 17 main(my_array)
<ipython-input-74-e1b992b5617f> in main(array)
12 inputs = [0,1]
13 if __name__ == '__main__':
---> 14 Parallel(n_jobs=8, verbose = 0)((foo)(array,i) for i in inputs)
15 # print(my_array, id(array), 'Original array')
16
~/.local/lib/python3.8/site-packages/joblib/parallel.py in __call__(self, iterable)
1040
1041 with self._backend.retrieval_context():
-> 1042 self.retrieve()
1043 # Make sure that we get a last message telling us we are done
1044 elapsed_time = time.time() - self._start_time
~/.local/lib/python3.8/site-packages/joblib/parallel.py in retrieve(self)
919 try:
920 if getattr(self._backend, 'supports_timeout', False):
--> 921 self._output.extend(job.get(timeout=self.timeout))
922 else:
923 self._output.extend(job.get())
~/.local/lib/python3.8/site-packages/joblib/_parallel_backends.py in wrap_future_result(future, timeout)
540 AsyncResults.get from multiprocessing."""
541 try:
--> 542 return future.result(timeout=timeout)
543 except CfTimeoutError as e:
544 raise TimeoutError from e
/usr/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
442 raise CancelledError()
443 elif self._state == FINISHED:
--> 444 return self.__get_result()
445 else:
446 raise TimeoutError()
/usr/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
387 if self._exception:
388 try:
--> 389 raise self._exception
390 finally:
391 # Break a reference cycle with the exception in self._exception
TypeError: cannot unpack non-iterable NoneType object
现在,由于数组已更改,我可以尝试将所有内容包装起来,语法除外并假装它有效,但我很好奇如何真正消除此错误。
感谢您的时间
最佳
你缺少的是 python joblib 中的 delayed
函数,将延迟放在并行调用语句中可以毫无错误地执行你的代码。例如
import numpy as np
from joblib import Parallel, delayed
my_array = np.zeros((2,3))
def foo(array, x):
for i in [0,1,2]:
array[x][i]=25
print(array, id(array), 'arrays in workers')
def main(array):
print(id(array), 'Original array')
inputs = [0, 1]
if __name__ == '__main__':
Parallel(n_jobs=8, verbose = 0, prefer='threads')([delayed(foo)(array, i) for i in inputs])
# print(my_array, id(array), 'Original array')
main(my_array)
此函数的理论或技术细节是 ,请阅读接受的答案以了解 delayed
在您的代码中的作用。
我正在尝试 运行 python
中的一个非常简单的并行循环from joblib import Parallel, delayed
my_array = np.zeros((2,3))
def foo(array,x):
for i in [0,1,2]:
array[x][i]=25
print(array, id(array), 'arrays in workers')
def main(array):
print(id(array), 'Original array')
inputs = [0,1]
if __name__ == '__main__':
Parallel(n_jobs=8, verbose = 0)((foo)(array,i) for i in inputs)
# print(my_array, id(array), 'Original array')
main(my_array)
最后确实改变了数组,但我收到以下错误
---------------------------------------------------------------------------
_RemoteTraceback Traceback (most recent call last)
_RemoteTraceback:
"""
Traceback (most recent call last):
File "/home/john/.local/lib/python3.8/site-packages/joblib/externals/loky/process_executor.py", line 431, in _process_worker
r = call_item()
File "/home/john/.local/lib/python3.8/site-packages/joblib/externals/loky/process_executor.py", line 285, in __call__
return self.fn(*self.args, **self.kwargs)
File "/home/john/.local/lib/python3.8/site-packages/joblib/_parallel_backends.py", line 595, in __call__
return self.func(*args, **kwargs)
File "/home/john/.local/lib/python3.8/site-packages/joblib/parallel.py", line 252, in __call__
return [func(*args, **kwargs)
File "/home/john/.local/lib/python3.8/site-packages/joblib/parallel.py", line 253, in <listcomp>
for func, args, kwargs in self.items]
TypeError: cannot unpack non-iterable NoneType object
"""
The above exception was the direct cause of the following exception:
TypeError Traceback (most recent call last)
<ipython-input-74-e1b992b5617f> in <module>
15 # print(my_array, id(array), 'Original array')
16
---> 17 main(my_array)
<ipython-input-74-e1b992b5617f> in main(array)
12 inputs = [0,1]
13 if __name__ == '__main__':
---> 14 Parallel(n_jobs=8, verbose = 0)((foo)(array,i) for i in inputs)
15 # print(my_array, id(array), 'Original array')
16
~/.local/lib/python3.8/site-packages/joblib/parallel.py in __call__(self, iterable)
1040
1041 with self._backend.retrieval_context():
-> 1042 self.retrieve()
1043 # Make sure that we get a last message telling us we are done
1044 elapsed_time = time.time() - self._start_time
~/.local/lib/python3.8/site-packages/joblib/parallel.py in retrieve(self)
919 try:
920 if getattr(self._backend, 'supports_timeout', False):
--> 921 self._output.extend(job.get(timeout=self.timeout))
922 else:
923 self._output.extend(job.get())
~/.local/lib/python3.8/site-packages/joblib/_parallel_backends.py in wrap_future_result(future, timeout)
540 AsyncResults.get from multiprocessing."""
541 try:
--> 542 return future.result(timeout=timeout)
543 except CfTimeoutError as e:
544 raise TimeoutError from e
/usr/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
442 raise CancelledError()
443 elif self._state == FINISHED:
--> 444 return self.__get_result()
445 else:
446 raise TimeoutError()
/usr/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
387 if self._exception:
388 try:
--> 389 raise self._exception
390 finally:
391 # Break a reference cycle with the exception in self._exception
TypeError: cannot unpack non-iterable NoneType object
现在,由于数组已更改,我可以尝试将所有内容包装起来,语法除外并假装它有效,但我很好奇如何真正消除此错误。 感谢您的时间 最佳
你缺少的是 python joblib 中的 delayed
函数,将延迟放在并行调用语句中可以毫无错误地执行你的代码。例如
import numpy as np
from joblib import Parallel, delayed
my_array = np.zeros((2,3))
def foo(array, x):
for i in [0,1,2]:
array[x][i]=25
print(array, id(array), 'arrays in workers')
def main(array):
print(id(array), 'Original array')
inputs = [0, 1]
if __name__ == '__main__':
Parallel(n_jobs=8, verbose = 0, prefer='threads')([delayed(foo)(array, i) for i in inputs])
# print(my_array, id(array), 'Original array')
main(my_array)
此函数的理论或技术细节是 delayed
在您的代码中的作用。