MPI 屏障不阻止文件写入、刷新和 os.fsync
MPI barrier not blocking file write, flush and os.fsync
我有这个测试代码,它执行以下操作:
将测试消息写入文件 > 屏障 > 读取测试消息 > 断言等于 > 重复。
from __future__ import print_function
import os
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
loop = True
def main():
global loop
txt_write = 'buhahaha'
with open('test', 'w') as f1:
if rank == 0:
f1.write(txt_write)
f1.flush()
os.fsync(f1.fileno())
comm.barrier()
with open('test') as f2:
txt_read = f2.read()
try:
assert txt_read == txt_write
except:
print("Assertion error", txt_read, "!=", txt_write, 'rank=', rank)
loop = False
finally:
comm.barrier()
if rank == 0:
os.remove('test')
if __name__ == '__main__':
i = 0
while loop:
main()
if i % 1000 == 0 and rank == 0:
print("Iterations:", i)
i += 1
它适用于少数 100 或 1000 次迭代,但有一次它读取一个空文件并且断言失败。其他答案建议使用 flush
和 os.fsync
,但这似乎没有帮助 - 它只会使执行速度变慢。知道如何解决这个问题吗?
也许你可以尝试这样的事情,而不是:
if rank == 0:
with open('test', 'w') as f1:
f1.write(txt_write)
# as @jschultz410 correctly pointed out,
# we remove f1.flush() and f1.close()
comm.barrier()
with open('test') as f2:
txt_read = f2.read()
代码导致 race condition 所有进程同时打开同一个文件。感谢@jschultz410 和@mko 识别这个逻辑错误。
我的代码解决方案是使用内存流而不是真实文件。现在,代码的 open、write 和 read 部分变为:
from io import StringIO
f1 = StringIO()
if rank == 0:
f1.write(txt_write)
f1.flush()
comm.barrier()
txt_read = f1.getvalue()
我有这个测试代码,它执行以下操作:
将测试消息写入文件 > 屏障 > 读取测试消息 > 断言等于 > 重复。
from __future__ import print_function
import os
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
loop = True
def main():
global loop
txt_write = 'buhahaha'
with open('test', 'w') as f1:
if rank == 0:
f1.write(txt_write)
f1.flush()
os.fsync(f1.fileno())
comm.barrier()
with open('test') as f2:
txt_read = f2.read()
try:
assert txt_read == txt_write
except:
print("Assertion error", txt_read, "!=", txt_write, 'rank=', rank)
loop = False
finally:
comm.barrier()
if rank == 0:
os.remove('test')
if __name__ == '__main__':
i = 0
while loop:
main()
if i % 1000 == 0 and rank == 0:
print("Iterations:", i)
i += 1
它适用于少数 100 或 1000 次迭代,但有一次它读取一个空文件并且断言失败。其他答案建议使用 flush
和 os.fsync
,但这似乎没有帮助 - 它只会使执行速度变慢。知道如何解决这个问题吗?
也许你可以尝试这样的事情,而不是:
if rank == 0:
with open('test', 'w') as f1:
f1.write(txt_write)
# as @jschultz410 correctly pointed out,
# we remove f1.flush() and f1.close()
comm.barrier()
with open('test') as f2:
txt_read = f2.read()
代码导致 race condition 所有进程同时打开同一个文件。感谢@jschultz410 和@mko 识别这个逻辑错误。
我的代码解决方案是使用内存流而不是真实文件。现在,代码的 open、write 和 read 部分变为:
from io import StringIO
f1 = StringIO()
if rank == 0:
f1.write(txt_write)
f1.flush()
comm.barrier()
txt_read = f1.getvalue()