如何迭代 google 云存储中的所有文件以用作数据流输入?
How to iterate all files in google cloud storage to be used as dataflow input?
用例
我想从云存储中解析多个文件并将结果插入 BigQuery table。
选择一个特定的文件来阅读效果很好。但是,在使用 *
glob 模式切换出一个文件以包含所有文件时,我遇到了困难。
我正在执行这样的工作:
python batch.py --project foobar --job_name foobar-metrics --runner DataflowRunner --staging_location gs://foobar-staging/dataflow --temp_location gs://foobar-staging/dataflow_temp --output foobar.test
这是第一个 Dataflow 实验,我不确定如何调试它或者对于这样的管道有哪些最佳实践。
预期结果
我希望作业被上传到 Dataflow 运行ner 并且收集文件列表并迭代每个文件将在 运行 时间在云中发生。我希望能够像读取一个文件时那样传递所有文件的内容。
实际结果
作业已在尝试将其提交到 Cloud Dataflow 运行ner 时阻塞。
batch.py
的内容
"""A metric sink workflow."""
from __future__ import absolute_import
import json
import argparse
import logging
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import SetupOptions
from apache_beam.utils.pipeline_options import GoogleCloudOptions
class ExtractDatapointsFn(beam.DoFn):
"""
Parse json documents and extract the metrics datapoints.
"""
def __init__(self):
super(ExtractDatapointsFn, self).__init__()
self.total_invalid = Metrics.counter(self.__class__, 'total_invalid')
def process(self, element):
"""
Process json that contains metrics of each element.
Args:
element: the element being processed.
Returns:
unmarshaled json for each metric point.
"""
try:
# Catch parsing errors as well as our custom key check.
document = json.loads(element)
if not "DataPoints" in document:
raise ValueError("missing DataPoints")
except ValueError:
self.total_invalid.inc(1)
return
for point in document["DataPoints"]:
yield point
def run(argv=None):
"""
Main entry point; defines and runs the pipeline.
"""
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='gs://foobar-sink/*',
help='Input file to process.')
parser.add_argument('--output',
required=True,
help=(
'Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
'or DATASET.TABLE.'))
known_args, pipeline_args = parser.parse_known_args(argv)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(GoogleCloudOptions)
pipe = beam.Pipeline(options=pipeline_options)
# Read the json data and extract the datapoints.
documents = pipe | 'read' >> ReadFromText(known_args.input)
metrics = documents | 'extract datapoints' >> beam.ParDo(ExtractDatapointsFn())
# BigQuery sink table.
_ = metrics | 'write bq' >> beam.io.Write(
beam.io.BigQuerySink(
known_args.output,
schema='Path:STRING, Value:FLOAT, Timestamp:TIMESTAMP',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
# Actually run the pipeline (all operations above are deferred).
result = pipe.run()
result.wait_until_finish()
total_invalid_filter = MetricsFilter().with_name('total_invalid')
query_result = result.metrics().query(total_invalid_filter)
if query_result['counters']:
total_invalid_counter = query_result['counters'][0]
logging.info('number of invalid documents: %d', total_invalid_counter.committed)
else:
logging.info('no invalid documents were found')
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
我们在作业提交时对来源进行大小估计,以便 Dataflow 服务可以在初始化作业时使用该信息(例如,确定初始工作器数量)。要估计 glob 的大小,我们需要扩展 glob。如果 glob 扩展到超过 100k 个文件,这可能需要一些时间(我相信 GCS 需要几分钟)。我们将在此处研究可以改善用户体验的方法。
用例
我想从云存储中解析多个文件并将结果插入 BigQuery table。
选择一个特定的文件来阅读效果很好。但是,在使用 *
glob 模式切换出一个文件以包含所有文件时,我遇到了困难。
我正在执行这样的工作:
python batch.py --project foobar --job_name foobar-metrics --runner DataflowRunner --staging_location gs://foobar-staging/dataflow --temp_location gs://foobar-staging/dataflow_temp --output foobar.test
这是第一个 Dataflow 实验,我不确定如何调试它或者对于这样的管道有哪些最佳实践。
预期结果
我希望作业被上传到 Dataflow 运行ner 并且收集文件列表并迭代每个文件将在 运行 时间在云中发生。我希望能够像读取一个文件时那样传递所有文件的内容。
实际结果
作业已在尝试将其提交到 Cloud Dataflow 运行ner 时阻塞。
batch.py
的内容"""A metric sink workflow."""
from __future__ import absolute_import
import json
import argparse
import logging
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import SetupOptions
from apache_beam.utils.pipeline_options import GoogleCloudOptions
class ExtractDatapointsFn(beam.DoFn):
"""
Parse json documents and extract the metrics datapoints.
"""
def __init__(self):
super(ExtractDatapointsFn, self).__init__()
self.total_invalid = Metrics.counter(self.__class__, 'total_invalid')
def process(self, element):
"""
Process json that contains metrics of each element.
Args:
element: the element being processed.
Returns:
unmarshaled json for each metric point.
"""
try:
# Catch parsing errors as well as our custom key check.
document = json.loads(element)
if not "DataPoints" in document:
raise ValueError("missing DataPoints")
except ValueError:
self.total_invalid.inc(1)
return
for point in document["DataPoints"]:
yield point
def run(argv=None):
"""
Main entry point; defines and runs the pipeline.
"""
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='gs://foobar-sink/*',
help='Input file to process.')
parser.add_argument('--output',
required=True,
help=(
'Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
'or DATASET.TABLE.'))
known_args, pipeline_args = parser.parse_known_args(argv)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(GoogleCloudOptions)
pipe = beam.Pipeline(options=pipeline_options)
# Read the json data and extract the datapoints.
documents = pipe | 'read' >> ReadFromText(known_args.input)
metrics = documents | 'extract datapoints' >> beam.ParDo(ExtractDatapointsFn())
# BigQuery sink table.
_ = metrics | 'write bq' >> beam.io.Write(
beam.io.BigQuerySink(
known_args.output,
schema='Path:STRING, Value:FLOAT, Timestamp:TIMESTAMP',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
# Actually run the pipeline (all operations above are deferred).
result = pipe.run()
result.wait_until_finish()
total_invalid_filter = MetricsFilter().with_name('total_invalid')
query_result = result.metrics().query(total_invalid_filter)
if query_result['counters']:
total_invalid_counter = query_result['counters'][0]
logging.info('number of invalid documents: %d', total_invalid_counter.committed)
else:
logging.info('no invalid documents were found')
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
我们在作业提交时对来源进行大小估计,以便 Dataflow 服务可以在初始化作业时使用该信息(例如,确定初始工作器数量)。要估计 glob 的大小,我们需要扩展 glob。如果 glob 扩展到超过 100k 个文件,这可能需要一些时间(我相信 GCS 需要几分钟)。我们将在此处研究可以改善用户体验的方法。