数据流管道中 WriteToText 文件的 ERRNO2
ERRNO2 for WriteToText files in a Dataflow pipeline
我有一个分支管道,其中包含多个 ParDo
转换,这些转换被合并并写入 GCS 存储桶中的文本文件记录。
我的管道崩溃后收到以下消息:
The worker lost contact with the service.
RuntimeError: FileNotFoundError: [Errno 2] Not found: gs://MYBUCKET/JOBNAME.00000-of-00001.avro [while running 'WriteToText/WriteToText/Write/WriteImpl/WriteBundles/WriteBundles']
它似乎找不到它正在写入的日志文件。直到发生错误的某个点之前似乎都很好。我想用 try:
/ except:
围绕它或断点,但我什至不确定如何发现根本原因。
有没有办法只写一个文件?还是只打开一个文件写一次?它将数千个输出文件发送到这个存储桶中,这是我想消除的,可能是一个因素。
with beam.Pipeline(argv=pipeline_args) as p:
csvlines = (
p | 'Read From CSV' >> beam.io.ReadFromText(known_args.input, skip_header_lines=1)
| 'Parse CSV to Dictionary' >> beam.ParDo(Split())
| 'Read Files into Memory' >> beam.ParDo(DownloadFilesDoFn())
| 'Windowing' >> beam.WindowInto(window.FixedWindows(20 * 60))
)
b1 = ( csvlines | 'Branch1' >> beam.ParDo(Branch1DoFn()) )
b2 = ( csvlines | 'Branch2' >> beam.ParDo(Branch2DoFn()) )
b3 = ( csvlines | 'Branch3' >> beam.ParDo(Branch3DoFn()) )
b4 = ( csvlines | 'Branch4' >> beam.ParDo(Branch4DoFn()) )
b5 = ( csvlines | 'Branch5' >> beam.ParDo(Branch5DoFn()) )
b6 = ( csvlines | 'Branch6' >> beam.ParDo(Branch6DoFn()) )
output = (
(b1,b2,b3,b4,b5,b6) | 'Merge PCollections' >> beam.Flatten()
| 'WriteToText' >> beam.io.Write(beam.io.textio.WriteToText(known_args.output))
)
此问题链接到 上一个问题,后者包含有关实施的更多详细信息。那里的解决方案建议在每次调用 ParDo(DoFn)
的 start_bundle()
中创建一个 google.cloud.storage.Client()
的实例。这连接到同一个 gcs 桶 - 通过 WriteToText(known_args.output)
中的参数给出
class DownloadFilesDoFn(beam.DoFn):
def __init__(self):
import re
self.gcs_path_regex = re.compile(r'gs:\/\/([^\/]+)\/(.*)')
def start_bundle(self):
import google.cloud.storage
self.gcs = google.cloud.storage.Client()
def process(self, element):
self.file_match = self.gcs_path_regex.match(element['Url'])
self.bucket = self.gcs.get_bucket(self.file_match.group(1))
self.blob = self.bucket.get_blob(self.file_match.group(2))
self.f = self.blob.download_as_bytes()
此错误的原因可能与与客户端的连接过多有关。我不清楚这方面的良好做法 - 因为 您可以通过这种方式为每个包设置网络连接。
将此添加到末尾以从捆绑包末尾的内存中删除客户端对象应该有助于关闭一些不必要的延迟连接。
def finish_bundle(self):
del self.gcs, self.gcs_path_regex
我有一个分支管道,其中包含多个 ParDo
转换,这些转换被合并并写入 GCS 存储桶中的文本文件记录。
我的管道崩溃后收到以下消息:
The worker lost contact with the service.
RuntimeError: FileNotFoundError: [Errno 2] Not found: gs://MYBUCKET/JOBNAME.00000-of-00001.avro [while running 'WriteToText/WriteToText/Write/WriteImpl/WriteBundles/WriteBundles']
它似乎找不到它正在写入的日志文件。直到发生错误的某个点之前似乎都很好。我想用 try:
/ except:
围绕它或断点,但我什至不确定如何发现根本原因。
有没有办法只写一个文件?还是只打开一个文件写一次?它将数千个输出文件发送到这个存储桶中,这是我想消除的,可能是一个因素。
with beam.Pipeline(argv=pipeline_args) as p:
csvlines = (
p | 'Read From CSV' >> beam.io.ReadFromText(known_args.input, skip_header_lines=1)
| 'Parse CSV to Dictionary' >> beam.ParDo(Split())
| 'Read Files into Memory' >> beam.ParDo(DownloadFilesDoFn())
| 'Windowing' >> beam.WindowInto(window.FixedWindows(20 * 60))
)
b1 = ( csvlines | 'Branch1' >> beam.ParDo(Branch1DoFn()) )
b2 = ( csvlines | 'Branch2' >> beam.ParDo(Branch2DoFn()) )
b3 = ( csvlines | 'Branch3' >> beam.ParDo(Branch3DoFn()) )
b4 = ( csvlines | 'Branch4' >> beam.ParDo(Branch4DoFn()) )
b5 = ( csvlines | 'Branch5' >> beam.ParDo(Branch5DoFn()) )
b6 = ( csvlines | 'Branch6' >> beam.ParDo(Branch6DoFn()) )
output = (
(b1,b2,b3,b4,b5,b6) | 'Merge PCollections' >> beam.Flatten()
| 'WriteToText' >> beam.io.Write(beam.io.textio.WriteToText(known_args.output))
)
此问题链接到 ParDo(DoFn)
的 start_bundle()
中创建一个 google.cloud.storage.Client()
的实例。这连接到同一个 gcs 桶 - 通过 WriteToText(known_args.output)
class DownloadFilesDoFn(beam.DoFn):
def __init__(self):
import re
self.gcs_path_regex = re.compile(r'gs:\/\/([^\/]+)\/(.*)')
def start_bundle(self):
import google.cloud.storage
self.gcs = google.cloud.storage.Client()
def process(self, element):
self.file_match = self.gcs_path_regex.match(element['Url'])
self.bucket = self.gcs.get_bucket(self.file_match.group(1))
self.blob = self.bucket.get_blob(self.file_match.group(2))
self.f = self.blob.download_as_bytes()
此错误的原因可能与与客户端的连接过多有关。我不清楚这方面的良好做法 - 因为
将此添加到末尾以从捆绑包末尾的内存中删除客户端对象应该有助于关闭一些不必要的延迟连接。
def finish_bundle(self):
del self.gcs, self.gcs_path_regex