Airflow 的“BeamRunPythonPipelineOperator”无法安装“py_requirements”
Airflow's `BeamRunPythonPipelineOperator` can't install `py_requirements`
- Airflow worker 的
Dockerfile
(在 Airflow kubernetes 集群上 运行)
FROM apache/airflow:2.1.1-python3.8
USER root
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
build-essential gcc git \
&& apt-get autoremove -yqq --purge \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
USER airflow
- 库版本
apache-airflow[amazon,docker,slack,google,postgres,kubernetes]==2.1.0
google-cloud-bigquery[bqstorage,pandas]==2.13.1
apache-airflow-providers-apache-beam~=3.0.0
- Dag 代码
tfdv_operator = BeamRunPythonPipelineOperator(
task_id="run_tfdv_on_dataflow",
py_file="gs://my-datasets/tfdv/beam_pipeline_tfdv.py",
pipeline_options={
"train_stats_output_path": f"gs://my-datasets/tfdv/train_stats.tfrecord",
"test_stats_output_path": f"gs://my-datasets/tfdv/test_stats.tfrecord",
},
py_options=[],
py_requirements=[
"apache-beam[gcp]==2.31.0",
"pyarrow==2.0.0",
"tensorflow-data-validation==1.1.0",
"tfx-bsl==1.1.1",
],
py_interpreter="python3.8",
py_system_site_packages=False,
runner="DataflowRunner",
dataflow_config=DataflowConfiguration(
job_name="{{task.task_id}}",
project_id="my-research",
location="asia-northeast3",
),
default_pipeline_options={
"temp_location": "gs://my-datasets/tfdv/dataflow_job_tmp/",
"staging_location": "gs://my-datasets/tfdv/dataflow_job_staging/",
},
)
运行 dag 但任务失败。日志:
*** Log file does not exist: /opt/airflow/logs/tfdv_dag/run_tfdv_on_dataflow/2021-08-15T06:30:51.833077+00:00/1.log
*** Fetching from: http://airflow-worker-0.airflow-worker.airflow-playground.svc.cluster.local:8793/log/tfdv_dag/run_tfdv_on_dataflow/2021-08-15T06:30:51.833077+00:00/1.log
[2021-08-15 06:30:53,088] {taskinstance.py:876} INFO - Dependencies all met for <TaskInstance: tfdv_dag.run_tfdv_on_dataflow 2021-08-15T06:30:51.833077+00:00 [queued]>
[2021-08-15 06:30:53,103] {taskinstance.py:876} INFO - Dependencies all met for <TaskInstance: tfdv_dag.run_tfdv_on_dataflow 2021-08-15T06:30:51.833077+00:00 [queued]>
[2021-08-15 06:30:53,103] {taskinstance.py:1067} INFO -
--------------------------------------------------------------------------------
[2021-08-15 06:30:53,103] {taskinstance.py:1068} INFO - Starting attempt 1 of 1
[2021-08-15 06:30:53,104] {taskinstance.py:1069} INFO -
--------------------------------------------------------------------------------
[2021-08-15 06:30:53,131] {taskinstance.py:1087} INFO - Executing <Task(BeamRunPythonPipelineOperator): run_tfdv_on_dataflow> on 2021-08-15T06:30:51.833077+00:00
[2021-08-15 06:30:53,136] {standard_task_runner.py:52} INFO - Started process 2304 to run task
[2021-08-15 06:30:53,140] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'tfdv_dag', 'run_tfdv_on_dataflow', '2021-08-15T06:30:51.833077+00:00', '--job-id', '679', '--pool', 'default_pool', '--raw', '--subdir', '/opt/***/dags/repo/dags/tfdv_dag.py', '--cfg-path', '/tmp/tmp5wikr_ja', '--error-file', '/tmp/tmpsq7cctia']
[2021-08-15 06:30:53,141] {standard_task_runner.py:77} INFO - Job 679: Subtask run_tfdv_on_dataflow
[2021-08-15 06:30:53,290] {logging_mixin.py:104} INFO - Running <TaskInstance: tfdv_dag.run_tfdv_on_dataflow 2021-08-15T06:30:51.833077+00:00 [running]> on host ***-worker-0.***-worker.***-playground.svc.cluster.local
[2021-08-15 06:30:53,402] {taskinstance.py:1280} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=tfdv_dag
AIRFLOW_CTX_TASK_ID=run_tfdv_on_dataflow
AIRFLOW_CTX_EXECUTION_DATE=2021-08-15T06:30:51.833077+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-08-15T06:30:51.833077+00:00
[2021-08-15 06:30:53,779] {gcs.py:310} INFO - File downloaded to /tmp/tmptqjahx3vbeam_pipeline_tfdv.py
[2021-08-15 06:30:53,779] {process_utils.py:135} INFO - Executing cmd: virtualenv /tmp/apache-beam-venvborjpiov --python=python3.8
[2021-08-15 06:30:53,787] {process_utils.py:139} INFO - Output:
[2021-08-15 06:30:54,085] {process_utils.py:143} INFO - created virtual environment CPython3.8.10.final.0-64 in 177ms
[2021-08-15 06:30:54,086] {process_utils.py:143} INFO - creator CPython3Posix(dest=/tmp/apache-beam-venvborjpiov, clear=False, no_vcs_ignore=False, global=False)
[2021-08-15 06:30:54,086] {process_utils.py:143} INFO - seeder FromAppData(download=False, pip=bundle, setuptools=bundle, wheel=bundle, via=copy, app_data_dir=/home/***/.local/share/virtualenv)
[2021-08-15 06:30:54,086] {process_utils.py:143} INFO - added seed packages: pip==21.1.3, setuptools==57.2.0, wheel==0.36.2
[2021-08-15 06:30:54,086] {process_utils.py:143} INFO - activators BashActivator,CShellActivator,FishActivator,PowerShellActivator,PythonActivator,XonshActivator
[2021-08-15 06:30:54,099] {process_utils.py:135} INFO - Executing cmd: /tmp/apache-beam-venvborjpiov/bin/pip install 'apache-beam[gcp]==2.31.0' pyarrow==2.0.0 tensorflow-data-validation==1.1.0 tfx-bsl==1.1.1
[2021-08-15 06:30:54,107] {process_utils.py:139} INFO - Output:
[2021-08-15 06:30:54,824] {process_utils.py:143} INFO - ERROR: Can not perform a '--user' install. User site-packages are not visible in this virtualenv.
[2021-08-15 06:30:54,993] {process_utils.py:143} INFO - WARNING: You are using pip version 21.1.3; however, version 21.2.4 is available.
[2021-08-15 06:30:54,993] {process_utils.py:143} INFO - You should consider upgrading via the '/tmp/apache-beam-venvborjpiov/bin/python -m pip install --upgrade pip' command.
[2021-08-15 06:30:55,056] {taskinstance.py:1481} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1137, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/apache/beam/operators/beam.py", line 242, in execute
self.beam_hook.start_python_pipeline(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/apache/beam/hooks/beam.py", line 243, in start_python_pipeline
py_interpreter = prepare_virtualenv(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/python_virtualenv.py", line 98, in prepare_virtualenv
execute_in_subprocess(pip_cmd)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/process_utils.py", line 147, in execute_in_subprocess
raise subprocess.CalledProcessError(exit_code, cmd)
subprocess.CalledProcessError: Command '['/tmp/apache-beam-venvborjpiov/bin/pip', 'install', 'apache-beam[gcp]==2.31.0', 'pyarrow==2.0.0', 'tensorflow-data-validation==1.1.0', 'tfx-bsl==1.1.1']' returned non-zero exit status 1.
[2021-08-15 06:30:55,058] {taskinstance.py:1524} INFO - Marking task as FAILED. dag_id=tfdv_dag, task_id=run_tfdv_on_dataflow, execution_date=20210815T063051, start_date=20210815T063053, end_date=20210815T063055
[2021-08-15 06:30:55,113] {local_task_job.py:151} INFO - Task exited with return code 1
看起来与ERROR: Can not perform a '--user' install. User site-packages are not visible in this virtualenv
高度相关,但不知道如何处理这个问题。
需要注意的一件事是,当我 运行 在我的本地机器(Mac OS X)中完全相同的 dag 时,没有任何代码更改(通过 airflow dags test my_dag 2021-01-01
),效果很好。
我该如何处理这个问题?
将选项 py_system_site_packages
更改为 True
。它应该使 site-packages
对虚拟环境可见并消除此错误。
详细解释见doc。
- Airflow worker 的
Dockerfile
(在 Airflow kubernetes 集群上 运行)
FROM apache/airflow:2.1.1-python3.8
USER root
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
build-essential gcc git \
&& apt-get autoremove -yqq --purge \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
USER airflow
- 库版本
apache-airflow[amazon,docker,slack,google,postgres,kubernetes]==2.1.0
google-cloud-bigquery[bqstorage,pandas]==2.13.1
apache-airflow-providers-apache-beam~=3.0.0
- Dag 代码
tfdv_operator = BeamRunPythonPipelineOperator(
task_id="run_tfdv_on_dataflow",
py_file="gs://my-datasets/tfdv/beam_pipeline_tfdv.py",
pipeline_options={
"train_stats_output_path": f"gs://my-datasets/tfdv/train_stats.tfrecord",
"test_stats_output_path": f"gs://my-datasets/tfdv/test_stats.tfrecord",
},
py_options=[],
py_requirements=[
"apache-beam[gcp]==2.31.0",
"pyarrow==2.0.0",
"tensorflow-data-validation==1.1.0",
"tfx-bsl==1.1.1",
],
py_interpreter="python3.8",
py_system_site_packages=False,
runner="DataflowRunner",
dataflow_config=DataflowConfiguration(
job_name="{{task.task_id}}",
project_id="my-research",
location="asia-northeast3",
),
default_pipeline_options={
"temp_location": "gs://my-datasets/tfdv/dataflow_job_tmp/",
"staging_location": "gs://my-datasets/tfdv/dataflow_job_staging/",
},
)
运行 dag 但任务失败。日志:
*** Log file does not exist: /opt/airflow/logs/tfdv_dag/run_tfdv_on_dataflow/2021-08-15T06:30:51.833077+00:00/1.log
*** Fetching from: http://airflow-worker-0.airflow-worker.airflow-playground.svc.cluster.local:8793/log/tfdv_dag/run_tfdv_on_dataflow/2021-08-15T06:30:51.833077+00:00/1.log
[2021-08-15 06:30:53,088] {taskinstance.py:876} INFO - Dependencies all met for <TaskInstance: tfdv_dag.run_tfdv_on_dataflow 2021-08-15T06:30:51.833077+00:00 [queued]>
[2021-08-15 06:30:53,103] {taskinstance.py:876} INFO - Dependencies all met for <TaskInstance: tfdv_dag.run_tfdv_on_dataflow 2021-08-15T06:30:51.833077+00:00 [queued]>
[2021-08-15 06:30:53,103] {taskinstance.py:1067} INFO -
--------------------------------------------------------------------------------
[2021-08-15 06:30:53,103] {taskinstance.py:1068} INFO - Starting attempt 1 of 1
[2021-08-15 06:30:53,104] {taskinstance.py:1069} INFO -
--------------------------------------------------------------------------------
[2021-08-15 06:30:53,131] {taskinstance.py:1087} INFO - Executing <Task(BeamRunPythonPipelineOperator): run_tfdv_on_dataflow> on 2021-08-15T06:30:51.833077+00:00
[2021-08-15 06:30:53,136] {standard_task_runner.py:52} INFO - Started process 2304 to run task
[2021-08-15 06:30:53,140] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'tfdv_dag', 'run_tfdv_on_dataflow', '2021-08-15T06:30:51.833077+00:00', '--job-id', '679', '--pool', 'default_pool', '--raw', '--subdir', '/opt/***/dags/repo/dags/tfdv_dag.py', '--cfg-path', '/tmp/tmp5wikr_ja', '--error-file', '/tmp/tmpsq7cctia']
[2021-08-15 06:30:53,141] {standard_task_runner.py:77} INFO - Job 679: Subtask run_tfdv_on_dataflow
[2021-08-15 06:30:53,290] {logging_mixin.py:104} INFO - Running <TaskInstance: tfdv_dag.run_tfdv_on_dataflow 2021-08-15T06:30:51.833077+00:00 [running]> on host ***-worker-0.***-worker.***-playground.svc.cluster.local
[2021-08-15 06:30:53,402] {taskinstance.py:1280} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=tfdv_dag
AIRFLOW_CTX_TASK_ID=run_tfdv_on_dataflow
AIRFLOW_CTX_EXECUTION_DATE=2021-08-15T06:30:51.833077+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-08-15T06:30:51.833077+00:00
[2021-08-15 06:30:53,779] {gcs.py:310} INFO - File downloaded to /tmp/tmptqjahx3vbeam_pipeline_tfdv.py
[2021-08-15 06:30:53,779] {process_utils.py:135} INFO - Executing cmd: virtualenv /tmp/apache-beam-venvborjpiov --python=python3.8
[2021-08-15 06:30:53,787] {process_utils.py:139} INFO - Output:
[2021-08-15 06:30:54,085] {process_utils.py:143} INFO - created virtual environment CPython3.8.10.final.0-64 in 177ms
[2021-08-15 06:30:54,086] {process_utils.py:143} INFO - creator CPython3Posix(dest=/tmp/apache-beam-venvborjpiov, clear=False, no_vcs_ignore=False, global=False)
[2021-08-15 06:30:54,086] {process_utils.py:143} INFO - seeder FromAppData(download=False, pip=bundle, setuptools=bundle, wheel=bundle, via=copy, app_data_dir=/home/***/.local/share/virtualenv)
[2021-08-15 06:30:54,086] {process_utils.py:143} INFO - added seed packages: pip==21.1.3, setuptools==57.2.0, wheel==0.36.2
[2021-08-15 06:30:54,086] {process_utils.py:143} INFO - activators BashActivator,CShellActivator,FishActivator,PowerShellActivator,PythonActivator,XonshActivator
[2021-08-15 06:30:54,099] {process_utils.py:135} INFO - Executing cmd: /tmp/apache-beam-venvborjpiov/bin/pip install 'apache-beam[gcp]==2.31.0' pyarrow==2.0.0 tensorflow-data-validation==1.1.0 tfx-bsl==1.1.1
[2021-08-15 06:30:54,107] {process_utils.py:139} INFO - Output:
[2021-08-15 06:30:54,824] {process_utils.py:143} INFO - ERROR: Can not perform a '--user' install. User site-packages are not visible in this virtualenv.
[2021-08-15 06:30:54,993] {process_utils.py:143} INFO - WARNING: You are using pip version 21.1.3; however, version 21.2.4 is available.
[2021-08-15 06:30:54,993] {process_utils.py:143} INFO - You should consider upgrading via the '/tmp/apache-beam-venvborjpiov/bin/python -m pip install --upgrade pip' command.
[2021-08-15 06:30:55,056] {taskinstance.py:1481} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1137, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/apache/beam/operators/beam.py", line 242, in execute
self.beam_hook.start_python_pipeline(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/apache/beam/hooks/beam.py", line 243, in start_python_pipeline
py_interpreter = prepare_virtualenv(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/python_virtualenv.py", line 98, in prepare_virtualenv
execute_in_subprocess(pip_cmd)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/process_utils.py", line 147, in execute_in_subprocess
raise subprocess.CalledProcessError(exit_code, cmd)
subprocess.CalledProcessError: Command '['/tmp/apache-beam-venvborjpiov/bin/pip', 'install', 'apache-beam[gcp]==2.31.0', 'pyarrow==2.0.0', 'tensorflow-data-validation==1.1.0', 'tfx-bsl==1.1.1']' returned non-zero exit status 1.
[2021-08-15 06:30:55,058] {taskinstance.py:1524} INFO - Marking task as FAILED. dag_id=tfdv_dag, task_id=run_tfdv_on_dataflow, execution_date=20210815T063051, start_date=20210815T063053, end_date=20210815T063055
[2021-08-15 06:30:55,113] {local_task_job.py:151} INFO - Task exited with return code 1
看起来与ERROR: Can not perform a '--user' install. User site-packages are not visible in this virtualenv
高度相关,但不知道如何处理这个问题。
需要注意的一件事是,当我 运行 在我的本地机器(Mac OS X)中完全相同的 dag 时,没有任何代码更改(通过 airflow dags test my_dag 2021-01-01
),效果很好。
我该如何处理这个问题?
将选项 py_system_site_packages
更改为 True
。它应该使 site-packages
对虚拟环境可见并消除此错误。
详细解释见doc。