Import error :Python Dataflow Job in cloud composer
Import error :Python Dataflow Job in cloud composer
我可以 运行 将单个文件作为 Cloud Composer 中的数据流作业,但是当我 运行 它作为一个包时它会失败。
pipeline_jobs/
-- __init__.py
-- run.py (main file)
-- setup.py
-- data_pipeline/
----- __init__.py
----- tasks.py
----- transform.py
----- util.py
我收到此错误:
WARNING - File "/tmp/dataflowd232f-run.py", line 14, in <module
{gcp_dataflow_hook.py:120} WARNING - from data_pipeline.tasks import task
WARNING - ImportError: No module named data_pipeline.tasks.
这是 dag 配置:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.strptime("2017-11-01","%Y-%m-%d"),
'py_options': [],
'dataflow_default_options': {
'start-date': '20171101',
'end-date': '20171101',
'project': '<project-id>',
'region': '<location>',
'temp_location': 'gs://<bucket>/flow/tmp',
'staging_location': 'gs://<bucket>/flow/staging',
'setup_file': 'gs://<bucket>/dags/pipeline_jobs/setup.py',
'runner': 'DataFlowRunner',
'job_name': 'job_name_lookup',
'task-id': 'run_pipeline'
},
}
dag = DAG(
dag_id='pipeline_01',
default_args=default_args,
max_active_runs=1,
concurrency =1
)
task_1 = DataFlowPythonOperator(
py_file = 'gs://<bucket>/dags/pipeline_jobs/run.py',
gcp_conn_id='google_cloud_default',
task_id='run_job',
dag=dag)
我尝试将 run.py 放入 dags 文件夹,但仍然出现相同的错误。
任何类型的建议都会很有帮助。
也试过这样做:
来自 pipeline_jobs .data_pipeline.tasks 导入任务
但仍然是同样的问题。
尝试将整个 pipeline_jobs/ 放入此 instruction 之后的 dags 文件夹中,并将数据流 py 文件引用为:/home/airflow/gcs/dags/pipeline_jobs/run.py。
我可以 运行 将单个文件作为 Cloud Composer 中的数据流作业,但是当我 运行 它作为一个包时它会失败。
pipeline_jobs/
-- __init__.py
-- run.py (main file)
-- setup.py
-- data_pipeline/
----- __init__.py
----- tasks.py
----- transform.py
----- util.py
我收到此错误:
WARNING - File "/tmp/dataflowd232f-run.py", line 14, in <module
{gcp_dataflow_hook.py:120} WARNING - from data_pipeline.tasks import task
WARNING - ImportError: No module named data_pipeline.tasks.
这是 dag 配置:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.strptime("2017-11-01","%Y-%m-%d"),
'py_options': [],
'dataflow_default_options': {
'start-date': '20171101',
'end-date': '20171101',
'project': '<project-id>',
'region': '<location>',
'temp_location': 'gs://<bucket>/flow/tmp',
'staging_location': 'gs://<bucket>/flow/staging',
'setup_file': 'gs://<bucket>/dags/pipeline_jobs/setup.py',
'runner': 'DataFlowRunner',
'job_name': 'job_name_lookup',
'task-id': 'run_pipeline'
},
}
dag = DAG(
dag_id='pipeline_01',
default_args=default_args,
max_active_runs=1,
concurrency =1
)
task_1 = DataFlowPythonOperator(
py_file = 'gs://<bucket>/dags/pipeline_jobs/run.py',
gcp_conn_id='google_cloud_default',
task_id='run_job',
dag=dag)
我尝试将 run.py 放入 dags 文件夹,但仍然出现相同的错误。 任何类型的建议都会很有帮助。
也试过这样做: 来自 pipeline_jobs .data_pipeline.tasks 导入任务 但仍然是同样的问题。
尝试将整个 pipeline_jobs/ 放入此 instruction 之后的 dags 文件夹中,并将数据流 py 文件引用为:/home/airflow/gcs/dags/pipeline_jobs/run.py。