Advice/Guidance - composer/beam/dataflow 在 gcp 上

Advice/Guidance - composer/beam/dataflow on gcp

我正尝试在 gcp 上 learn/try 出云 composer/beam/dataflow。

我已经编写了函数来对 python 中的数据进行一些基本的清理,并在 cloud composer 中使用 DAG 来 运行 这个函数从存储桶下载文件,处理它,然后以设定的频率将其上传到存储桶。

这都是定制的书面功能。我现在正试图弄清楚如何使用 Beam 管道和数据流来代替并使用 Cloud Composer 来启动数据流作业。

我正在尝试进行的清理工作是采用 col1、col2、col3、col4、col5 的 csv 输入,并合并中间 3 列以输出 col1、combinedcol234、col5 的 csv。

我的问题是...

我一直在谷歌搜索并尝试进行研究,但似乎找不到任何可以帮助我充分理解它的东西。

如有任何帮助,我们将不胜感激。谢谢。

Beam 程序只是一个普通的 Python 程序,它构建一个管道并运行它。例如

''' 定义主要(): 以 beam.Pipline() 作为 p: p | beam.io.ReadFromText(...) | beam.Map(...) | beam.io.WriteToText(...) '''

很多examples can be found in the repository and the programming guide is useful toohttps://beam.apache.org/documentation/programming-guide/ . The easiest way to read CSV files is with the dataframes API which retruns an object you can manipulate as if it were a Pandas Dataframe, or you can turn into a PCollection (where each column is an attribute of a named tuple) and process with Beam's Map, FlatMap, etc,例如

pcoll | beam.Map(
    lambda row: (row.col1, func(row.col2, row.col3, row.col4), row.col5)))

您可以在 python 中使用 DataflowCreatePythonJobOperator 到 运行 数据流作业。

  • 您必须实例化您的云作曲家环境;
  • 在存储桶中添加数据流作业文件;
  • 将输入文件添加到存储桶;
  • 在composer环境的DAGs目录下添加如下dag:

composer_dataflow_dag.py:

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataflow import DataflowCreatePythonJobOperator
from airflow.utils.dates import days_ago

bucket_path = "gs://<bucket name>"
project_id = "<project name>"
gce_zone = "us-central1-a"
import pytz

tz = pytz.timezone('US/Pacific')
tstmp = datetime.datetime.now(tz).strftime('%Y%m%d%H%M%S')

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "tempLocation": bucket_path + "/tmp/",
    },
}


with models.DAG(
    "composer_dataflow_dag",
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:

    create_mastertable = DataflowCreatePythonJobOperator(
        task_id="create_mastertable",
        py_file=f'gs://<bucket name>/dataflow-job.py',
        options={"runner":"DataflowRunner","project":project_id,"region":"us-central1" ,"temp_location":"gs://<bucket name>/", "staging_location":"gs://<bucket name>/"},
        job_name=f'job{tstmp}',
        location='us-central1',
        wait_until_finished=True,
    )

这是数据流作业文件,其中包含您想连接一些列数据的修改:

dataflow-job.py

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import os
from datetime import datetime
import pytz

tz = pytz.timezone('US/Pacific')
tstmp = datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S')

bucket_path = "gs://<bucket>"
input_file = f'{bucket_path}/inputFile.txt'
output = f'{bucket_path}/output_{tstmp}.txt'


p = beam.Pipeline(options=PipelineOptions())

( p | 'Read from a File' >> beam.io.ReadFromText(input_file, skip_header_lines=1)
    | beam.Map(lambda x:x.split(","))
     | beam.Map(lambda x:f'{x[0]},{x[1]}{x[2]}{x[3]},{x[4]}')
     | beam.io.WriteToText(output) )

p.run().wait_until_finish()

运行ning 后,结果将存储在 gcs Bucket 中: