如何在 Python 中 运行 来自 Datalab 的 Dataflow 作业?

How can I run a Dataflow job from Datalab in Python?

我在从 Datalab 执行 Dataflow 作业时遇到了一些问题 运行。我可以做的是针对这种情况的最小工作 Python 代码示例,因为 Google Cloud Platform 或 Apache Beam 文档中似乎没有。

如果我能从执行以下操作的 Datalab 单元中 运行 看到一些 Python 代码,那将对我很有帮助。

# 1. Sets up the job

# 2. Defines the processing logic to be applied to the input data files

# 3. Saves the processed files to an output folder

# 4. Submits the job to Google Cloud Dataflow

为了解决这个问题,我尝试使用 Google 和 Apache 文档中的字数统计示例,并将它们改编为在 Datalab 中使用。其代码如下,但我不清楚我可以去掉哪些部分以将其变成一个真正最小的工作示例。

from __future__ import absolute_import
import argparse
import logging
import re
from past.builtins import unicode
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

def run(argv=None):
  """Main entry point; defines and runs the wordcount pipeline."""
  parser = argparse.ArgumentParser()
  parser.add_argument('--input',
                      dest='input',
                      default='gs://data-analytics/kinglear.txt',
                      help='Input file to process.')
  parser.add_argument('--output',
                      dest='output',
                      default='gs://data-analytics/output',
                      help='Output file to write results to.')
  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_args.extend([
      '--runner=DataflowRunner',
      '--project=project',
      '--staging_location=gs://staging',
      '--temp_location=gs://tmp',
      '--job_name=your-wordcount-job',
  ])

  # We use the save_main_session option because one or more DoFn's in this
  # workflow rely on global context (e.g., a module imported at module level).
  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True

  with beam.Pipeline(options=pipeline_options) as p:

    # Read the text file[pattern] into a PCollection.
    lines = p | ReadFromText(known_args.input)

    # Count the occurrences of each word.
    counts = (
        lines
        | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
                  .with_output_types(unicode))
        | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
        | 'GroupAndSum' >> beam.CombinePerKey(sum))

    # Format the counts into a PCollection of strings.
    def format_result(word_count):
      (word, count) = word_count
      return '%s: %s' % (word, count)

    output = counts | 'Format' >> beam.Map(format_result)

    # Write the output using a "Write" transform that has side effects.
    output | WriteToText(known_args.output)

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

提前致谢!

乔什

我认为您混淆了 Datalab 的功能和 Dataflow 的功能。这是两个不同的编程平台,您正在将它们混合在一起。您的评论:Defines the processing logic to be applied to the input data files。处理逻辑是 Cloud Dataflow 的源代码(或模板)提供的,而不是 Cloud Datalab 笔记本中的代码 运行ning。

作为一个选项:如果您安装 Cloud Dataflow 库并使用 Python 2.x,您就可以编写 Cloud Dataflow (Apache Beam) 软件在 Datalab 笔记本中。此代码将 运行 在本地 Datalab 内,不会启动 Dataflow 作业。

这里有一些 link 可以帮助您编写将创建 Cloud Dataflow 作业的软件。

这是一个 Whosebug 答案,它会向您展示如何在 python 中启动 Dataflow 作业:

Google Java 的数据流文档,但对所需步骤有很好的解释:

Method: projects.jobs.list

这是 link 到数据流 Python 客户端 API:

Dataflow Client API

我借助此处的教程解决了这个问题:https://github.com/hayatoy/dataflow-tutorial 现在可以使用以下代码从 Datalab 启动 Dataflow 作业。

import apache_beam as beam

# Pipeline options:
options                         = beam.options.pipeline_options.PipelineOptions()
gcloud_options                  = options.view_as(beam.options.pipeline_options.GoogleCloudOptions)
gcloud_options.job_name         = 'test'
gcloud_options.project          = 'project'
gcloud_options.staging_location = 'gs://staging'
gcloud_options.temp_location    = 'gs://tmp'
gcloud_options.region           = 'europe-west2'

# Worker options:
worker_options                  = options.view_as(beam.options.pipeline_options.WorkerOptions)
worker_options.disk_size_gb     = 30
worker_options.max_num_workers  = 10

# Standard options:
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'

# Pipeline:
PL = beam.Pipeline(options=options)

(
      PL | 'read'  >> beam.io.ReadFromText('gs://input.txt')
         | 'write' >> beam.io.WriteToText ('gs://output.txt', num_shards=1)
)

PL.run()

谢谢,

乔什