如何迭代 google 云存储中的所有文件以用作数据流输入?

How to iterate all files in google cloud storage to be used as dataflow input?

用例

我想从云存储中解析多个文件并将结果插入 BigQuery table。

选择一个特定的文件来阅读效果很好。但是,在使用 * glob 模式切换出一个文件以包含所有文件时,我遇到了困难。

我正在执行这样的工作:

python batch.py --project foobar --job_name foobar-metrics --runner DataflowRunner --staging_location gs://foobar-staging/dataflow --temp_location gs://foobar-staging/dataflow_temp --output foobar.test

这是第一个 Dataflow 实验,我不确定如何调试它或者对于这样的管道有哪些最佳实践。

预期结果

我希望作业被上传到 Dataflow 运行ner 并且收集文件列表并迭代每个文件将在 运行 时间在云中发生。我希望能够像读取一个文件时那样传递所有文件的内容。

实际结果

作业已在尝试将其提交到 Cloud Dataflow 运行ner 时阻塞。

batch.py

的内容
"""A metric sink workflow."""

from __future__ import absolute_import

import json
import argparse
import logging

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import SetupOptions
from apache_beam.utils.pipeline_options import GoogleCloudOptions

class ExtractDatapointsFn(beam.DoFn):
    """
    Parse json documents and extract the metrics datapoints.
    """
    def __init__(self):
        super(ExtractDatapointsFn, self).__init__()
        self.total_invalid = Metrics.counter(self.__class__, 'total_invalid')

    def process(self, element):
        """
        Process json that contains metrics of each element.

        Args:
            element: the element being processed.

        Returns:
            unmarshaled json for each metric point.
        """
        try:
            # Catch parsing errors as well as our custom key check.
            document = json.loads(element)
            if not "DataPoints" in document:
                raise ValueError("missing DataPoints")
        except ValueError:
            self.total_invalid.inc(1)
            return

        for point in document["DataPoints"]:
            yield point

def run(argv=None):
    """
    Main entry point; defines and runs the pipeline.
    """
    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
                        dest='input',
                        default='gs://foobar-sink/*',
                        help='Input file to process.')
    parser.add_argument('--output',
                        required=True,
                        help=(
                            'Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
                            'or DATASET.TABLE.'))
    known_args, pipeline_args = parser.parse_known_args(argv)
    # We use the save_main_session option because one or more DoFn's in this
    # workflow rely on global context (e.g., a module imported at module level).
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    pipeline_options.view_as(GoogleCloudOptions)
    pipe = beam.Pipeline(options=pipeline_options)

    # Read the json data and extract the datapoints.
    documents = pipe | 'read' >> ReadFromText(known_args.input)
    metrics = documents | 'extract datapoints' >> beam.ParDo(ExtractDatapointsFn())

    # BigQuery sink table.
    _ = metrics | 'write bq' >> beam.io.Write(
        beam.io.BigQuerySink(
            known_args.output,
            schema='Path:STRING, Value:FLOAT, Timestamp:TIMESTAMP',
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))

    # Actually run the pipeline (all operations above are deferred).
    result = pipe.run()
    result.wait_until_finish()

    total_invalid_filter = MetricsFilter().with_name('total_invalid')
    query_result = result.metrics().query(total_invalid_filter)
    if query_result['counters']:
        total_invalid_counter = query_result['counters'][0]
        logging.info('number of invalid documents: %d', total_invalid_counter.committed)
    else:
        logging.info('no invalid documents were found')

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

我们在作业提交时对来源进行大小估计,以便 Dataflow 服务可以在初始化作业时使用该信息(例如,确定初始工作器数量)。要估计 glob 的大小,我们需要扩展 glob。如果 glob 扩展到超过 100k 个文件,这可能需要一些时间(我相信 GCS 需要几分钟)。我们将在此处研究可以改善用户体验的方法。