运行 DataFlow 上的光束管道时文件在关闭后被覆盖
File being overwritten after closed when running a beam pipeline on DataFlow
我已经创建了一个 Beam 管道 p
运行 在数据流上,并且想在 运行 我的管道之前向文件写入一些东西。我的代码是:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import time
pipeline_options = PipelineOptions(runner='DirectRunner')
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
myString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum."
myFile3984573498534 = open('myfile2398457erity348t67349856734986739846.txt','w+')
myFile3984573498534.write(myString*100)
myFile3984573498534.close()
time.sleep(1)
r = p.run()
文件正在正确写入,但是一旦调用 p.run()
,它就会被覆盖为空白。谁能解释为什么会这样?
备注:
- 更改文件名和文件变量名不会影响结果。
- 我插入了
time.sleep(1)
,这样可以看到文件在调用p.run()
之前写入,文件被覆盖为空白。这不是必需的,可以是 changed/removed.
问题是由 pipeline_options.view_as(SetupOptions).save_main_session = True
行引起的。
当管道运行时,beam 将使用dill.dump_session
序列化主会话并将其保存到文件中。然后它将使用 dill.load_session
加载同一个文件并反序列化它以重新创建主会话。它将使用 dill.dump_session
再次重新序列化主会话以发送给运行器。序列化、反序列化,然后重新序列化主会话的原因是为了修复序列化中的不一致,如 https://github.com/uqfoundation/dill/issues/195 中提出的那样。这意味着所有跑步者都会遇到这个问题。
本例中的主会话包含 myFile3984573498534
文件对象。反序列化后,它将使用 w+
模式以与您最初打开文件相同的方式重新打开文件。这将立即覆盖该文件。然后关闭此文件,管道以文件空白结束。
最好的解决方法是在 r+
模式下打开文件,以便在主会话反序列化期间以读取模式打开文件,从而使其不被修改。
如果需要以w+
方式打开文件,关闭文件后应删除存储文件的变量,即del(myFile3984573498534)
后myFile3984573498534.close()
但在 运行 管道之前。这样可以防止变量因为不存在而被序列化,导致文件没有被修改
我已经创建了一个 Beam 管道 p
运行 在数据流上,并且想在 运行 我的管道之前向文件写入一些东西。我的代码是:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import time
pipeline_options = PipelineOptions(runner='DirectRunner')
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
myString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum."
myFile3984573498534 = open('myfile2398457erity348t67349856734986739846.txt','w+')
myFile3984573498534.write(myString*100)
myFile3984573498534.close()
time.sleep(1)
r = p.run()
文件正在正确写入,但是一旦调用 p.run()
,它就会被覆盖为空白。谁能解释为什么会这样?
备注:
- 更改文件名和文件变量名不会影响结果。
- 我插入了
time.sleep(1)
,这样可以看到文件在调用p.run()
之前写入,文件被覆盖为空白。这不是必需的,可以是 changed/removed.
问题是由 pipeline_options.view_as(SetupOptions).save_main_session = True
行引起的。
当管道运行时,beam 将使用dill.dump_session
序列化主会话并将其保存到文件中。然后它将使用 dill.load_session
加载同一个文件并反序列化它以重新创建主会话。它将使用 dill.dump_session
再次重新序列化主会话以发送给运行器。序列化、反序列化,然后重新序列化主会话的原因是为了修复序列化中的不一致,如 https://github.com/uqfoundation/dill/issues/195 中提出的那样。这意味着所有跑步者都会遇到这个问题。
本例中的主会话包含 myFile3984573498534
文件对象。反序列化后,它将使用 w+
模式以与您最初打开文件相同的方式重新打开文件。这将立即覆盖该文件。然后关闭此文件,管道以文件空白结束。
最好的解决方法是在 r+
模式下打开文件,以便在主会话反序列化期间以读取模式打开文件,从而使其不被修改。
如果需要以w+
方式打开文件,关闭文件后应删除存储文件的变量,即del(myFile3984573498534)
后myFile3984573498534.close()
但在 运行 管道之前。这样可以防止变量因为不存在而被序列化,导致文件没有被修改