Python 多处理竞争条件
Python multiprocessing race condition
我在使用 concurrent.futures
读取多个文本文件时发现了一个奇怪的错误。
这是一个可重现的小例子:
import os
import concurrent.futures
def read_file(file):
with open(os.path.join(data_dir, file),buffering=1000) as f:
for row in f:
try:
print(row)
except Exception as e:
print(str(e))
if __name__ == '__main__':
data_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'data'))
files = ['file1', 'file2']
with concurrent.futures.ProcessPoolExecutor() as executor:
for file,_ in zip(files,executor.map(read_file,files)):
pass
file1
和file2
是data
目录下的任意文本文件。
我收到以下错误(基本上是进程在分配 data_dir
变量之前尝试读取它):
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "C:\Users\my_username\AppData\Local\Continuum\Anaconda3\lib\concurrent\futures\process.py", line 175, in _process_worker
r = call_item.fn(*call_item.args, **call_item.kwargs)
File "C:\Users\my_username\AppData\Local\Continuum\Anaconda3\lib\concurrent\futures\process.py", line 153, in _process_chunk
return [fn(*args) for args in chunk]
File "C:\Users\my_username\AppData\Local\Continuum\Anaconda3\lib\concurrent\futures\process.py", line 153, in <listcomp>
return [fn(*args) for args in chunk]
File "C:\Users\my_username\Downloads\example.py", line 5, in read_file
with open(os.path.join(data_dir, file),buffering=1000) as f:
NameError: name 'data_dir' is not defined
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "example.py", line 16, in <module>
for file,_ in zip(files,executor.map(read_file,files)):
File "C:\Users\my_username\AppData\Local\Continuum\Anaconda3\lib\concurrent\futures\_base.py", line 556, in result_iterator
yield future.result()
File "C:\Users\my_username\AppData\Local\Continuum\Anaconda3\lib\concurrent\futures\_base.py", line 405, in result
return self.__get_result()
File "C:\Users\my_username\AppData\Local\Continuum\Anaconda3\lib\concurrent\futures\_base.py", line 357, in __get_result
raise self._exception
NameError: name 'data_dir' is not defined
如果我在 if __name__ == '__main__':
块之前放置 data_dir
赋值,我不会收到此错误并且代码会按预期执行。
导致此错误的原因是什么?显然,在这两种情况下,data_dir
都应在任何异步调用之前分配。
ProcessPoolExecutor
生成一个新的 Python 进程 ,导入正确的模块并调用您提供的函数。
由于 data_dir
只会在您 运行 模块时定义,而不是在您 import 时定义,错误是意料之中。
提供 data_dir
文件描述符作为 read_file
的参数可能 有效,因为我相信进程会继承其父进程的文件描述符。不过你需要检查一下。
但是,如果要使用 ThreadPoolExecutor
,您的示例应该可以工作,因为生成的线程共享内存。
fork()
在 windows 上不可用,因此 python 使用 spawn
启动新进程,这将启动一个新的 python 解释器进程,否内存将被共享,但 python 将 try to recreate worker function environment in the new process, that's why module level variable works. See doc for more detail.
我在使用 concurrent.futures
读取多个文本文件时发现了一个奇怪的错误。
这是一个可重现的小例子:
import os
import concurrent.futures
def read_file(file):
with open(os.path.join(data_dir, file),buffering=1000) as f:
for row in f:
try:
print(row)
except Exception as e:
print(str(e))
if __name__ == '__main__':
data_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'data'))
files = ['file1', 'file2']
with concurrent.futures.ProcessPoolExecutor() as executor:
for file,_ in zip(files,executor.map(read_file,files)):
pass
file1
和file2
是data
目录下的任意文本文件。
我收到以下错误(基本上是进程在分配 data_dir
变量之前尝试读取它):
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "C:\Users\my_username\AppData\Local\Continuum\Anaconda3\lib\concurrent\futures\process.py", line 175, in _process_worker
r = call_item.fn(*call_item.args, **call_item.kwargs)
File "C:\Users\my_username\AppData\Local\Continuum\Anaconda3\lib\concurrent\futures\process.py", line 153, in _process_chunk
return [fn(*args) for args in chunk]
File "C:\Users\my_username\AppData\Local\Continuum\Anaconda3\lib\concurrent\futures\process.py", line 153, in <listcomp>
return [fn(*args) for args in chunk]
File "C:\Users\my_username\Downloads\example.py", line 5, in read_file
with open(os.path.join(data_dir, file),buffering=1000) as f:
NameError: name 'data_dir' is not defined
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "example.py", line 16, in <module>
for file,_ in zip(files,executor.map(read_file,files)):
File "C:\Users\my_username\AppData\Local\Continuum\Anaconda3\lib\concurrent\futures\_base.py", line 556, in result_iterator
yield future.result()
File "C:\Users\my_username\AppData\Local\Continuum\Anaconda3\lib\concurrent\futures\_base.py", line 405, in result
return self.__get_result()
File "C:\Users\my_username\AppData\Local\Continuum\Anaconda3\lib\concurrent\futures\_base.py", line 357, in __get_result
raise self._exception
NameError: name 'data_dir' is not defined
如果我在 if __name__ == '__main__':
块之前放置 data_dir
赋值,我不会收到此错误并且代码会按预期执行。
导致此错误的原因是什么?显然,在这两种情况下,data_dir
都应在任何异步调用之前分配。
ProcessPoolExecutor
生成一个新的 Python 进程 ,导入正确的模块并调用您提供的函数。
由于 data_dir
只会在您 运行 模块时定义,而不是在您 import 时定义,错误是意料之中。
提供 data_dir
文件描述符作为 read_file
的参数可能 有效,因为我相信进程会继承其父进程的文件描述符。不过你需要检查一下。
但是,如果要使用 ThreadPoolExecutor
,您的示例应该可以工作,因为生成的线程共享内存。
fork()
在 windows 上不可用,因此 python 使用 spawn
启动新进程,这将启动一个新的 python 解释器进程,否内存将被共享,但 python 将 try to recreate worker function environment in the new process, that's why module level variable works. See doc for more detail.