MPI 屏障不阻止文件写入、刷新和 os.fsync

MPI barrier not blocking file write, flush and os.fsync

我有这个测试代码,它执行以下操作:

将测试消息写入文件 > 屏障 > 读取测试消息 > 断言等于 > 重复。

from __future__ import print_function
import os
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()
loop = True


def main():
    global loop
    txt_write = 'buhahaha'

    with open('test', 'w') as f1:
        if rank == 0:
            f1.write(txt_write)

        f1.flush()
        os.fsync(f1.fileno())

    comm.barrier()

    with open('test') as f2:
        txt_read = f2.read()

    try:
        assert txt_read == txt_write
    except:
        print("Assertion error", txt_read, "!=", txt_write, 'rank=', rank)
        loop = False
    finally:
        comm.barrier()
        if rank == 0:
            os.remove('test')


if __name__ == '__main__':
    i = 0
    while loop:
        main()
        if i % 1000 == 0 and rank == 0:
            print("Iterations:", i)

        i += 1

它适用于少数 100 或 1000 次迭代,但有一次它读取一个空文件并且断言失败。其他答案建议使用 flushos.fsync,但这似乎没有帮助 - 它只会使执行速度变慢。知道如何解决这个问题吗?

也许你可以尝试这样的事情,而不是:

if rank == 0:
  with open('test', 'w') as f1:
    f1.write(txt_write)
    # as @jschultz410 correctly pointed out, 
    # we remove f1.flush() and f1.close()

comm.barrier()

with open('test') as f2:
  txt_read = f2.read()

代码导致 race condition 所有进程同时打开同一个文件。感谢@jschultz410 和@mko 识别这个逻辑错误。

我的代码解决方案是使用内存流而不是真实文件。现在,代码的 open、writeread 部分变为:

from io import StringIO

f1 = StringIO()
if rank == 0:
    f1.write(txt_write)

f1.flush()
comm.barrier()

txt_read = f1.getvalue()