运行 DataFlow 上的光束管道时文件在关闭后被覆盖

File being overwritten after closed when running a beam pipeline on DataFlow

我已经创建了一个 Beam 管道 p 运行 在数据流上,并且想在 运行 我的管道之前向文件写入一些东西。我的代码是:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import time

pipeline_options = PipelineOptions(runner='DirectRunner')
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)

myString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum."

myFile3984573498534 = open('myfile2398457erity348t67349856734986739846.txt','w+')
myFile3984573498534.write(myString*100)
myFile3984573498534.close()

time.sleep(1)

r = p.run()

文件正在正确写入,但是一旦调用 p.run(),它就会被覆盖为空白。谁能解释为什么会这样?

备注:

问题是由 pipeline_options.view_as(SetupOptions).save_main_session = True 行引起的。

当管道运行时,beam 将使用dill.dump_session 序列化主会话并将其保存到文件中。然后它将使用 dill.load_session 加载同一个文件并反序列化它以重新创建主会话。它将使用 dill.dump_session 再次重新序列化主会话以发送给运行器。序列化、反序列化,然后重新序列化主会话的原因是为了修复序列化中的不一致,如 https://github.com/uqfoundation/dill/issues/195 中提出的那样。这意味着所有跑步者都会遇到这个问题。

本例中的主会话包含 myFile3984573498534 文件对象。反序列化后,它将使用 w+ 模式以与您最初打开文件相同的方式重新打开文件。这将立即覆盖该文件。然后关闭此文件,管道以文件空白结束。

最好的解决方法是在 r+ 模式下打开文件,以便在主会话反序列化期间以读取模式打开文件,从而使其不被修改。

如果需要w+方式打开文件,关闭文件后应删除存储文件的变量,即del(myFile3984573498534)myFile3984573498534.close() 但在 运行 管道之前。这样可以防止变量因为不存在而被序列化,导致文件没有被修改