Advice/Guidance - composer/beam/dataflow 在 gcp 上
Advice/Guidance - composer/beam/dataflow on gcp
-
google-cloud-platform
-
google-cloud-dataflow
-
google-cloud-functions
-
apache-beam
-
google-cloud-composer
我正尝试在 gcp 上 learn/try 出云 composer/beam/dataflow。
我已经编写了函数来对 python 中的数据进行一些基本的清理,并在 cloud composer 中使用 DAG 来 运行 这个函数从存储桶下载文件,处理它,然后以设定的频率将其上传到存储桶。
这都是定制的书面功能。我现在正试图弄清楚如何使用 Beam 管道和数据流来代替并使用 Cloud Composer 来启动数据流作业。
我正在尝试进行的清理工作是采用 col1、col2、col3、col4、col5 的 csv 输入,并合并中间 3 列以输出 col1、combinedcol234、col5 的 csv。
我的问题是...
如何在 Beam 管道中引入我自己的函数来执行此合并?
我应该引入自己的函数还是 beam 内置了这样做的方法?
然后如何从 dag 触发管道?
有人在 git hub 上有任何示例代码吗?
我一直在谷歌搜索并尝试进行研究,但似乎找不到任何可以帮助我充分理解它的东西。
如有任何帮助,我们将不胜感激。谢谢。
Beam 程序只是一个普通的 Python 程序,它构建一个管道并运行它。例如
'''
定义主要():
以 beam.Pipline() 作为 p:
p | beam.io.ReadFromText(...) | beam.Map(...) | beam.io.WriteToText(...)
'''
很多examples can be found in the repository and the programming guide is useful toohttps://beam.apache.org/documentation/programming-guide/ . The easiest way to read CSV files is with the dataframes API which retruns an object you can manipulate as if it were a Pandas Dataframe, or you can turn into a PCollection (where each column is an attribute of a named tuple) and process with Beam's Map, FlatMap, etc,例如
pcoll | beam.Map(
lambda row: (row.col1, func(row.col2, row.col3, row.col4), row.col5)))
您可以在 python 中使用 DataflowCreatePythonJobOperator
到 运行 数据流作业。
- 您必须实例化您的云作曲家环境;
- 在存储桶中添加数据流作业文件;
- 将输入文件添加到存储桶;
- 在composer环境的
DAGs
目录下添加如下dag:
composer_dataflow_dag.py
:
import datetime
from airflow import models
from airflow.providers.google.cloud.operators.dataflow import DataflowCreatePythonJobOperator
from airflow.utils.dates import days_ago
bucket_path = "gs://<bucket name>"
project_id = "<project name>"
gce_zone = "us-central1-a"
import pytz
tz = pytz.timezone('US/Pacific')
tstmp = datetime.datetime.now(tz).strftime('%Y%m%d%H%M%S')
default_args = {
# Tell airflow to start one day ago, so that it runs as soon as you upload it
"start_date": days_ago(1),
"dataflow_default_options": {
"project": project_id,
# Set to your zone
"zone": gce_zone,
# This is a subfolder for storing temporary files, like the staged pipeline job.
"tempLocation": bucket_path + "/tmp/",
},
}
with models.DAG(
"composer_dataflow_dag",
default_args=default_args,
schedule_interval=datetime.timedelta(days=1), # Override to match your needs
) as dag:
create_mastertable = DataflowCreatePythonJobOperator(
task_id="create_mastertable",
py_file=f'gs://<bucket name>/dataflow-job.py',
options={"runner":"DataflowRunner","project":project_id,"region":"us-central1" ,"temp_location":"gs://<bucket name>/", "staging_location":"gs://<bucket name>/"},
job_name=f'job{tstmp}',
location='us-central1',
wait_until_finished=True,
)
这是数据流作业文件,其中包含您想连接一些列数据的修改:
dataflow-job.py
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import os
from datetime import datetime
import pytz
tz = pytz.timezone('US/Pacific')
tstmp = datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S')
bucket_path = "gs://<bucket>"
input_file = f'{bucket_path}/inputFile.txt'
output = f'{bucket_path}/output_{tstmp}.txt'
p = beam.Pipeline(options=PipelineOptions())
( p | 'Read from a File' >> beam.io.ReadFromText(input_file, skip_header_lines=1)
| beam.Map(lambda x:x.split(","))
| beam.Map(lambda x:f'{x[0]},{x[1]}{x[2]}{x[3]},{x[4]}')
| beam.io.WriteToText(output) )
p.run().wait_until_finish()
运行ning 后,结果将存储在 gcs Bucket 中:
google-cloud-platform
google-cloud-dataflow
google-cloud-functions
apache-beam
google-cloud-composer
我正尝试在 gcp 上 learn/try 出云 composer/beam/dataflow。
我已经编写了函数来对 python 中的数据进行一些基本的清理,并在 cloud composer 中使用 DAG 来 运行 这个函数从存储桶下载文件,处理它,然后以设定的频率将其上传到存储桶。
这都是定制的书面功能。我现在正试图弄清楚如何使用 Beam 管道和数据流来代替并使用 Cloud Composer 来启动数据流作业。
我正在尝试进行的清理工作是采用 col1、col2、col3、col4、col5 的 csv 输入,并合并中间 3 列以输出 col1、combinedcol234、col5 的 csv。
我的问题是...
如何在 Beam 管道中引入我自己的函数来执行此合并?
我应该引入自己的函数还是 beam 内置了这样做的方法?
然后如何从 dag 触发管道?
有人在 git hub 上有任何示例代码吗?
我一直在谷歌搜索并尝试进行研究,但似乎找不到任何可以帮助我充分理解它的东西。
如有任何帮助,我们将不胜感激。谢谢。
Beam 程序只是一个普通的 Python 程序,它构建一个管道并运行它。例如
''' 定义主要(): 以 beam.Pipline() 作为 p: p | beam.io.ReadFromText(...) | beam.Map(...) | beam.io.WriteToText(...) '''
很多examples can be found in the repository and the programming guide is useful toohttps://beam.apache.org/documentation/programming-guide/ . The easiest way to read CSV files is with the dataframes API which retruns an object you can manipulate as if it were a Pandas Dataframe, or you can turn into a PCollection (where each column is an attribute of a named tuple) and process with Beam's Map, FlatMap, etc,例如
pcoll | beam.Map(
lambda row: (row.col1, func(row.col2, row.col3, row.col4), row.col5)))
您可以在 python 中使用 DataflowCreatePythonJobOperator
到 运行 数据流作业。
- 您必须实例化您的云作曲家环境;
- 在存储桶中添加数据流作业文件;
- 将输入文件添加到存储桶;
- 在composer环境的
DAGs
目录下添加如下dag:
composer_dataflow_dag.py
:
import datetime
from airflow import models
from airflow.providers.google.cloud.operators.dataflow import DataflowCreatePythonJobOperator
from airflow.utils.dates import days_ago
bucket_path = "gs://<bucket name>"
project_id = "<project name>"
gce_zone = "us-central1-a"
import pytz
tz = pytz.timezone('US/Pacific')
tstmp = datetime.datetime.now(tz).strftime('%Y%m%d%H%M%S')
default_args = {
# Tell airflow to start one day ago, so that it runs as soon as you upload it
"start_date": days_ago(1),
"dataflow_default_options": {
"project": project_id,
# Set to your zone
"zone": gce_zone,
# This is a subfolder for storing temporary files, like the staged pipeline job.
"tempLocation": bucket_path + "/tmp/",
},
}
with models.DAG(
"composer_dataflow_dag",
default_args=default_args,
schedule_interval=datetime.timedelta(days=1), # Override to match your needs
) as dag:
create_mastertable = DataflowCreatePythonJobOperator(
task_id="create_mastertable",
py_file=f'gs://<bucket name>/dataflow-job.py',
options={"runner":"DataflowRunner","project":project_id,"region":"us-central1" ,"temp_location":"gs://<bucket name>/", "staging_location":"gs://<bucket name>/"},
job_name=f'job{tstmp}',
location='us-central1',
wait_until_finished=True,
)
这是数据流作业文件,其中包含您想连接一些列数据的修改:
dataflow-job.py
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import os
from datetime import datetime
import pytz
tz = pytz.timezone('US/Pacific')
tstmp = datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S')
bucket_path = "gs://<bucket>"
input_file = f'{bucket_path}/inputFile.txt'
output = f'{bucket_path}/output_{tstmp}.txt'
p = beam.Pipeline(options=PipelineOptions())
( p | 'Read from a File' >> beam.io.ReadFromText(input_file, skip_header_lines=1)
| beam.Map(lambda x:x.split(","))
| beam.Map(lambda x:f'{x[0]},{x[1]}{x[2]}{x[3]},{x[4]}')
| beam.io.WriteToText(output) )
p.run().wait_until_finish()
运行ning 后,结果将存储在 gcs Bucket 中: