没有名为 airfow.gcp 的模块 - 如何 运行 使用 python3/beam 2.15 的数据流作业?
No module named airfow.gcp - how to run dataflow job that uses python3/beam 2.15?
当我像 BigQueryHook 一样使用 operators/hooks 时,我看到一条消息,指出这些运算符已被弃用并使用 airflow.gcp... 运算符版本。然而,当我尝试在我的 dag 中使用它时,它失败了,并说没有名为 airflow.gcp 的模块。我有最新的带有 beta 功能的 airflow composer 版本,python3。是否可以通过某种方式安装这些运算符?
我正在尝试使用 beam 2.15 运行 python 3 中的数据流作业。我试过 virtualenv 运算符,但这不起作用,因为它只允许 python2.7。我该怎么做?
Composer 中可用的最新 Airflow 版本是 1.10.2 或 1.10.3(取决于地区)。到那时,这些运算符在 contrib
部分。
关注如何使用 Composer 运行 Python 3 个数据流作业,您需要发布新版本。但是,如果您需要立即解决方案,可以尝试向后移植 fix.
在这种情况下,我定义了一个 DataFlow3Hook
,它扩展了正常的 DataFlowHook
,但它不会在 start_python_dataflow
方法中硬编码 python2
:
class DataFlow3Hook(DataFlowHook):
def start_python_dataflow(
...
py_interpreter: str = "python3"
):
...
self._start_dataflow(variables, name, [py_interpreter] + py_options + [dataflow],
label_formatter)
然后我们将让我们的自定义 DataFlowPython3Operator
调用新的钩子:
class DataFlowPython3Operator(DataFlowPythonOperator):
def execute(self, context):
...
hook = DataFlow3Hook(gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
poll_sleep=self.poll_sleep)
...
hook.start_python_dataflow(
self.job_name, formatted_options,
self.py_file, self.py_options, py_interpreter="python3")
最后,在我们的 DAG 中,我们只使用 new 运算符:
task = DataFlowPython3Operator(
py_file='/home/airflow/gcs/data/main.py',
task_id=JOB_NAME,
dag=dag)
查看完整代码here。作业 运行s Python 3.6:
使用的环境详细信息和依赖项(Beam 作业是一个最小示例):
softwareConfig:
imageVersion: composer-1.8.0-airflow-1.10.3
pypiPackages:
apache-beam: ==2.15.0
google-api-core: ==1.14.3
google-apitools: ==0.5.28
google-cloud-core: ==1.0.3
pythonVersion: '3'
让我知道这是否适合您。如果是这样,我建议将代码移至插件以提高代码可读性并跨 DAG 重用它。
作为替代方案,您可以在较旧的气流版本上使用 PythonVirtualenvOperator
。给定一些光束管道(包装在一个函数中)保存为 dataflow_python3.py
:
def main():
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import argparse
import logging
class ETL(beam.DoFn):
def process(self, row):
#do data processing
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
default='gs://bucket/input/input.txt',
help='Input file to process.'
)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--runner=DataflowRunner',
'--project=project_id',
'--region=region',
'--staging_location=gs://bucket/staging/',
'--temp_location=gs://bucket/temp/',
'--job_name=job_id',
'--setup_file=./setup.py'
])
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
etl = (rows | 'process data' >> beam.ParDo(ETL()))
logging.getLogger().setLevel(logging.DEBUG)
run()
您可以 运行 使用以下 DAG 文件:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonVirtualenvOperator
import sys
import dataflow_python3 as py3 #import your beam pipeline file here
default_args = {
'owner': 'John Smith',
'depends_on_past': False,
'start_date': datetime(2016, 1, 1),
'email': ['email@gmail.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=1),
}
CONNECTION_ID = 'proj_id'
with DAG('Dataflow_Python3', schedule_interval='@once', template_searchpath=['/home/airflow/gcs/dags/'], max_active_runs=15, catchup=True, default_args=default_args) as dag:
dataflow_python3 = PythonVirtualenvOperator(
task_id='dataflow_python3',
python_callable=py3.main, #this is your beam pipeline callable
requirements=['apache-beam[gcp]', 'pandas'],
python_version=3,
dag=dag
)
dataflow_python3
我有 运行 Python 3 Beam -2.17 通过使用 DataflowTemplateOperator 并且它非常有效。
使用以下命令创建模板:
python3 -m scriptname --runner DataflowRunner --project project_id --staging_location staging_location --temp_location temp_location --template_location template_location/script_metadata --region region --experiments use_beam_bq_sink --no_use_public_ips --subnetwork=subnetwork
scriptname 将是您的 Dataflow Python 文件的名称(不带 .py 扩展名)
--template_location - 创建数据流模板的位置,不要在其中放置任何扩展名,如 .json。简单地说,scriptname_metadata 就可以了。
--experiments use_beam_bq_sink - 如果您的接收器是 BigQuery,则将使用此参数,否则您可以将其删除。
import datetime as dt
import time
from airflow.models import DAG
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
lasthour = dt.datetime.now() - dt.timedelta(hours=1)
args = {
'owner': 'airflow',
'start_date': lasthour,
'depends_on_past': False,
'dataflow_default_options': {
'project': "project_id",
'staging_location': "staging_location",
'temp_location': "temp_location",
'region': "region",
'runner': "DataflowRunner",
'job_name': 'job_name' + str(time.time()),
},
}
dag = DAG(
dag_id='employee_dataflow_dag',
schedule_interval=None,
default_args=args
)
Dataflow_Run = DataflowTemplateOperator(
task_id='dataflow_pipeline',
template='template_location/script_metadata',
parameters ={
'input':"employee.csv",
'output':'project_id:dataset_id.table',
'region':"region"
},
gcp_conn_id='google_cloud_default',
poll_sleep=15,
dag=dag
)
Dataflow_Run
当我像 BigQueryHook 一样使用 operators/hooks 时,我看到一条消息,指出这些运算符已被弃用并使用 airflow.gcp... 运算符版本。然而,当我尝试在我的 dag 中使用它时,它失败了,并说没有名为 airflow.gcp 的模块。我有最新的带有 beta 功能的 airflow composer 版本,python3。是否可以通过某种方式安装这些运算符?
我正在尝试使用 beam 2.15 运行 python 3 中的数据流作业。我试过 virtualenv 运算符,但这不起作用,因为它只允许 python2.7。我该怎么做?
Composer 中可用的最新 Airflow 版本是 1.10.2 或 1.10.3(取决于地区)。到那时,这些运算符在 contrib
部分。
关注如何使用 Composer 运行 Python 3 个数据流作业,您需要发布新版本。但是,如果您需要立即解决方案,可以尝试向后移植 fix.
在这种情况下,我定义了一个 DataFlow3Hook
,它扩展了正常的 DataFlowHook
,但它不会在 start_python_dataflow
方法中硬编码 python2
:
class DataFlow3Hook(DataFlowHook):
def start_python_dataflow(
...
py_interpreter: str = "python3"
):
...
self._start_dataflow(variables, name, [py_interpreter] + py_options + [dataflow],
label_formatter)
然后我们将让我们的自定义 DataFlowPython3Operator
调用新的钩子:
class DataFlowPython3Operator(DataFlowPythonOperator):
def execute(self, context):
...
hook = DataFlow3Hook(gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
poll_sleep=self.poll_sleep)
...
hook.start_python_dataflow(
self.job_name, formatted_options,
self.py_file, self.py_options, py_interpreter="python3")
最后,在我们的 DAG 中,我们只使用 new 运算符:
task = DataFlowPython3Operator(
py_file='/home/airflow/gcs/data/main.py',
task_id=JOB_NAME,
dag=dag)
查看完整代码here。作业 运行s Python 3.6:
使用的环境详细信息和依赖项(Beam 作业是一个最小示例):
softwareConfig:
imageVersion: composer-1.8.0-airflow-1.10.3
pypiPackages:
apache-beam: ==2.15.0
google-api-core: ==1.14.3
google-apitools: ==0.5.28
google-cloud-core: ==1.0.3
pythonVersion: '3'
让我知道这是否适合您。如果是这样,我建议将代码移至插件以提高代码可读性并跨 DAG 重用它。
作为替代方案,您可以在较旧的气流版本上使用 PythonVirtualenvOperator
。给定一些光束管道(包装在一个函数中)保存为 dataflow_python3.py
:
def main():
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import argparse
import logging
class ETL(beam.DoFn):
def process(self, row):
#do data processing
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
default='gs://bucket/input/input.txt',
help='Input file to process.'
)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--runner=DataflowRunner',
'--project=project_id',
'--region=region',
'--staging_location=gs://bucket/staging/',
'--temp_location=gs://bucket/temp/',
'--job_name=job_id',
'--setup_file=./setup.py'
])
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
etl = (rows | 'process data' >> beam.ParDo(ETL()))
logging.getLogger().setLevel(logging.DEBUG)
run()
您可以 运行 使用以下 DAG 文件:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonVirtualenvOperator
import sys
import dataflow_python3 as py3 #import your beam pipeline file here
default_args = {
'owner': 'John Smith',
'depends_on_past': False,
'start_date': datetime(2016, 1, 1),
'email': ['email@gmail.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=1),
}
CONNECTION_ID = 'proj_id'
with DAG('Dataflow_Python3', schedule_interval='@once', template_searchpath=['/home/airflow/gcs/dags/'], max_active_runs=15, catchup=True, default_args=default_args) as dag:
dataflow_python3 = PythonVirtualenvOperator(
task_id='dataflow_python3',
python_callable=py3.main, #this is your beam pipeline callable
requirements=['apache-beam[gcp]', 'pandas'],
python_version=3,
dag=dag
)
dataflow_python3
我有 运行 Python 3 Beam -2.17 通过使用 DataflowTemplateOperator 并且它非常有效。
使用以下命令创建模板:
python3 -m scriptname --runner DataflowRunner --project project_id --staging_location staging_location --temp_location temp_location --template_location template_location/script_metadata --region region --experiments use_beam_bq_sink --no_use_public_ips --subnetwork=subnetwork
scriptname 将是您的 Dataflow Python 文件的名称(不带 .py 扩展名)
--template_location - 创建数据流模板的位置,不要在其中放置任何扩展名,如 .json。简单地说,scriptname_metadata 就可以了。
--experiments use_beam_bq_sink - 如果您的接收器是 BigQuery,则将使用此参数,否则您可以将其删除。
import datetime as dt
import time
from airflow.models import DAG
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
lasthour = dt.datetime.now() - dt.timedelta(hours=1)
args = {
'owner': 'airflow',
'start_date': lasthour,
'depends_on_past': False,
'dataflow_default_options': {
'project': "project_id",
'staging_location': "staging_location",
'temp_location': "temp_location",
'region': "region",
'runner': "DataflowRunner",
'job_name': 'job_name' + str(time.time()),
},
}
dag = DAG(
dag_id='employee_dataflow_dag',
schedule_interval=None,
default_args=args
)
Dataflow_Run = DataflowTemplateOperator(
task_id='dataflow_pipeline',
template='template_location/script_metadata',
parameters ={
'input':"employee.csv",
'output':'project_id:dataset_id.table',
'region':"region"
},
gcp_conn_id='google_cloud_default',
poll_sleep=15,
dag=dag
)
Dataflow_Run