Google Cloud Composer,airflow 作业无法识别已安装的 PyPi 包
Google Cloud Composer, airflow job cannot recognize installed PyPi packages
我正在使用 Google Cloud Composer 处理 Airflow。这是 dag 文件:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
dag = DAG(
'hello_world',
description='Simple DAG',
start_date=datetime.now() - timedelta(days=1),
schedule_interval='@once'
)
hello = BashOperator(
task_id='hello_world',
bash_command='python3 /home/airflow/gcs/dags/dependencies/helper.py',
dag=dag
)
它基本上在位于 Google Cloud Storage 的文件夹 /dags/dependencies/
中运行 helper.py
,DAG 包目录。
helper.py
包含以下代码:
from fastavro import writer
import io
import logging
def greetings():
buffer = io.BytesIO()
age = 24
schema = {
'doc': "cockroach",
'name': "table",
'namespace': "cockroach",
'type': "record",
'fields': [{'name': 'age', 'type': ['null', 'int']}]
}
writer(buffer, schema=schema, records=[{"age": 24}])
logging.info("Hello {}".format(name))
return "Hello {}".format(name)
它引发错误 ModuleNotFoundError: No module named 'fastavro'
:
[2019-01-11 04:01:57,388] {base_task_runner.py:98} INFO - Subtask: [2019-01-11 04:01:57,386] {bash_operator.py:101} INFO - Traceback (most recent call last):
[2019-01-11 04:01:57,389] {base_task_runner.py:98} INFO - Subtask: [2019-01-11 04:01:57,388] {bash_operator.py:101} INFO - File "/home/airflow/gcs/dags/dependencies/helper.py", line 1, in <module>
[2019-01-11 04:01:57,389] {base_task_runner.py:98} INFO - Subtask: [2019-01-11 04:01:57,388] {bash_operator.py:101} INFO - from fastavro import writer
[2019-01-11 04:01:57,390] {base_task_runner.py:98} INFO - Subtask: [2019-01-11 04:01:57,389] {bash_operator.py:101} INFO - ModuleNotFoundError: No module named 'fastavro'
[2019-01-11 04:01:58,154] {base_task_runner.py:98} INFO - Subtask: [2019-01-11 04:01:58,152] {bash_operator.py:105} INFO - Command exited with return code 1
[2019-01-11 04:01:58,214] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2019-01-11 04:01:58,214] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/bin/airflow", line 6, in <module>
[2019-01-11 04:01:58,214] {base_task_runner.py:98} INFO - Subtask: exec(compile(open(__file__).read(), __file__, 'exec'))
[2019-01-11 04:01:58,215] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/airflow/airflow/bin/airflow", line 27, in <module>
[2019-01-11 04:01:58,215] {base_task_runner.py:98} INFO - Subtask: args.func(args)
[2019-01-11 04:01:58,215] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/airflow/airflow/bin/cli.py", line 392, in run
[2019-01-11 04:01:58,215] {base_task_runner.py:98} INFO - Subtask: pool=args.pool,
[2019-01-11 04:01:58,215] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/airflow/airflow/utils/db.py", line 50, in wrapper
[2019-01-11 04:01:58,216] {base_task_runner.py:98} INFO - Subtask: result = func(*args, **kwargs)
[2019-01-11 04:01:58,216] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/airflow/airflow/models.py", line 1492, in _run_raw_task
[2019-01-11 04:01:58,216] {base_task_runner.py:98} INFO - Subtask: result = task_copy.execute(context=context)
[2019-01-11 04:01:58,219] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/airflow/airflow/operators/bash_operator.py", line 109, in execute
[2019-01-11 04:01:58,219] {base_task_runner.py:98} INFO - Subtask: raise AirflowException("Bash command failed")
[2019-01-11 04:01:58,220] {base_task_runner.py:98} INFO - Subtask: airflow.exceptions.AirflowException: Bash command failed
不过,我已经在 Google Composer 环境的 PyPi 包中安装了 fastavro
:
有人知道怎么解决吗?
我已经解决了。这主要是因为与 python2 的版本冲突(Google Cloud Composer 的默认版本)。
因此,我使用 python3 环境重新创建了一个新的 Google Cloud Composer 环境(因为环境的 Python 版本一旦创建就无法更改:https://cloud.google.com/composer/docs/concepts/python-version)。
它解决了问题。
我正在使用 Google Cloud Composer 处理 Airflow。这是 dag 文件:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
dag = DAG(
'hello_world',
description='Simple DAG',
start_date=datetime.now() - timedelta(days=1),
schedule_interval='@once'
)
hello = BashOperator(
task_id='hello_world',
bash_command='python3 /home/airflow/gcs/dags/dependencies/helper.py',
dag=dag
)
它基本上在位于 Google Cloud Storage 的文件夹 /dags/dependencies/
中运行 helper.py
,DAG 包目录。
helper.py
包含以下代码:
from fastavro import writer
import io
import logging
def greetings():
buffer = io.BytesIO()
age = 24
schema = {
'doc': "cockroach",
'name': "table",
'namespace': "cockroach",
'type': "record",
'fields': [{'name': 'age', 'type': ['null', 'int']}]
}
writer(buffer, schema=schema, records=[{"age": 24}])
logging.info("Hello {}".format(name))
return "Hello {}".format(name)
它引发错误 ModuleNotFoundError: No module named 'fastavro'
:
[2019-01-11 04:01:57,388] {base_task_runner.py:98} INFO - Subtask: [2019-01-11 04:01:57,386] {bash_operator.py:101} INFO - Traceback (most recent call last):
[2019-01-11 04:01:57,389] {base_task_runner.py:98} INFO - Subtask: [2019-01-11 04:01:57,388] {bash_operator.py:101} INFO - File "/home/airflow/gcs/dags/dependencies/helper.py", line 1, in <module>
[2019-01-11 04:01:57,389] {base_task_runner.py:98} INFO - Subtask: [2019-01-11 04:01:57,388] {bash_operator.py:101} INFO - from fastavro import writer
[2019-01-11 04:01:57,390] {base_task_runner.py:98} INFO - Subtask: [2019-01-11 04:01:57,389] {bash_operator.py:101} INFO - ModuleNotFoundError: No module named 'fastavro'
[2019-01-11 04:01:58,154] {base_task_runner.py:98} INFO - Subtask: [2019-01-11 04:01:58,152] {bash_operator.py:105} INFO - Command exited with return code 1
[2019-01-11 04:01:58,214] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2019-01-11 04:01:58,214] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/bin/airflow", line 6, in <module>
[2019-01-11 04:01:58,214] {base_task_runner.py:98} INFO - Subtask: exec(compile(open(__file__).read(), __file__, 'exec'))
[2019-01-11 04:01:58,215] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/airflow/airflow/bin/airflow", line 27, in <module>
[2019-01-11 04:01:58,215] {base_task_runner.py:98} INFO - Subtask: args.func(args)
[2019-01-11 04:01:58,215] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/airflow/airflow/bin/cli.py", line 392, in run
[2019-01-11 04:01:58,215] {base_task_runner.py:98} INFO - Subtask: pool=args.pool,
[2019-01-11 04:01:58,215] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/airflow/airflow/utils/db.py", line 50, in wrapper
[2019-01-11 04:01:58,216] {base_task_runner.py:98} INFO - Subtask: result = func(*args, **kwargs)
[2019-01-11 04:01:58,216] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/airflow/airflow/models.py", line 1492, in _run_raw_task
[2019-01-11 04:01:58,216] {base_task_runner.py:98} INFO - Subtask: result = task_copy.execute(context=context)
[2019-01-11 04:01:58,219] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/airflow/airflow/operators/bash_operator.py", line 109, in execute
[2019-01-11 04:01:58,219] {base_task_runner.py:98} INFO - Subtask: raise AirflowException("Bash command failed")
[2019-01-11 04:01:58,220] {base_task_runner.py:98} INFO - Subtask: airflow.exceptions.AirflowException: Bash command failed
不过,我已经在 Google Composer 环境的 PyPi 包中安装了 fastavro
:
有人知道怎么解决吗?
我已经解决了。这主要是因为与 python2 的版本冲突(Google Cloud Composer 的默认版本)。 因此,我使用 python3 环境重新创建了一个新的 Google Cloud Composer 环境(因为环境的 Python 版本一旦创建就无法更改:https://cloud.google.com/composer/docs/concepts/python-version)。 它解决了问题。