将转换后的 oozie DAG 部署到 Google Composer Airflow:没有名为 'o2a' 的模块
Deploy converted oozie DAG into Google Composer Airflow: No module named 'o2a'
我正在使用 google oozie to airflow converter 转换一些在 AWS EMR 上 运行 的 oozie 工作流。设法获得第一个版本,但是当我尝试上传 DAG 时,airflow 抛出错误:
损坏的 DAG:没有名为 'o2a'
的模块
我尝试部署 pypi 包 o2a,都使用命令
gcloud composer environments update composer-name --update-pypi-packages-from-file requirements.txt --location location
并且来自 google 云控制台。都失败了。
requirements.txt
o2a==1.0.1
这是代码
from airflow import models
from airflow.operators.subdag_operator import SubDagOperator
from airflow.utils import dates
from o2a.o2a_libs import functions
from airflow.models import Variable
import subdag_validation
import subdag_generate_reports
CONFIG = {}
JOB_PROPS = {
}
dag_config = Variable.get("coordinator", deserialize_json=True)
cdrPeriod = dag_config["cdrPeriod"]
TASK_MAP = {"validation": ["validation"], "generate_reports": ["generate_reports"] }
TEMPLATE_ENV = {**CONFIG, **JOB_PROPS, "functions": functions, "task_map": TASK_MAP}
with models.DAG(
"workflow_coordinator",
schedule_interval=None, # Change to suit your needs
start_date=dates.days_ago(0), # Change to suit your needs
user_defined_macros=TEMPLATE_ENV,
) as dag:
validation = SubDagOperator(
task_id="validation",
trigger_rule="one_success",
subdag=subdag_validation.sub_dag(dag.dag_id, "validation", dag.start_date, dag.schedule_interval),
)
generate_reports = SubDagOperator(
task_id="generate_reports",
trigger_rule="one_success",
subdag=subdag_generate_reports.sub_dag(dag.dag_id, "generate_reports", dag.start_date, dag.schedule_interval,
{
"cdrPeriod": "{{cdrPeriod}}"
}),
)
validation.set_downstream(generate_reports)
o2a 文档中有一节介绍了如何部署 o2a:
https://github.com/GoogleCloudPlatform/oozie-to-airflow#the-o2a-libraries
With 开始失败,因为另一个 dependency:lark-parser
刚刚使用 pypi 包管理器安装 Composer 就成功了。
我正在使用 google oozie to airflow converter 转换一些在 AWS EMR 上 运行 的 oozie 工作流。设法获得第一个版本,但是当我尝试上传 DAG 时,airflow 抛出错误:
损坏的 DAG:没有名为 'o2a'
的模块我尝试部署 pypi 包 o2a,都使用命令
gcloud composer environments update composer-name --update-pypi-packages-from-file requirements.txt --location location
并且来自 google 云控制台。都失败了。
requirements.txt
o2a==1.0.1
这是代码
from airflow import models
from airflow.operators.subdag_operator import SubDagOperator
from airflow.utils import dates
from o2a.o2a_libs import functions
from airflow.models import Variable
import subdag_validation
import subdag_generate_reports
CONFIG = {}
JOB_PROPS = {
}
dag_config = Variable.get("coordinator", deserialize_json=True)
cdrPeriod = dag_config["cdrPeriod"]
TASK_MAP = {"validation": ["validation"], "generate_reports": ["generate_reports"] }
TEMPLATE_ENV = {**CONFIG, **JOB_PROPS, "functions": functions, "task_map": TASK_MAP}
with models.DAG(
"workflow_coordinator",
schedule_interval=None, # Change to suit your needs
start_date=dates.days_ago(0), # Change to suit your needs
user_defined_macros=TEMPLATE_ENV,
) as dag:
validation = SubDagOperator(
task_id="validation",
trigger_rule="one_success",
subdag=subdag_validation.sub_dag(dag.dag_id, "validation", dag.start_date, dag.schedule_interval),
)
generate_reports = SubDagOperator(
task_id="generate_reports",
trigger_rule="one_success",
subdag=subdag_generate_reports.sub_dag(dag.dag_id, "generate_reports", dag.start_date, dag.schedule_interval,
{
"cdrPeriod": "{{cdrPeriod}}"
}),
)
validation.set_downstream(generate_reports)
o2a 文档中有一节介绍了如何部署 o2a:
https://github.com/GoogleCloudPlatform/oozie-to-airflow#the-o2a-libraries
With 开始失败,因为另一个 dependency:lark-parser 刚刚使用 pypi 包管理器安装 Composer 就成功了。