基于数据库连接上可用的行动态创建 DAG

Dynamically Creating DAG based on Row available on DB Connection

我想从数据库 table 查询创建一个动态创建的 DAG。当我尝试从精确数字范围或基于气流设置中的可用对象创建动态创建的 DAG 时,它成功了。但是,当我尝试使用 PostgresHook 并为 table 的每一行创建 DAG 时,每当我在 table 中添加新行时,我都可以看到生成了一个新的 DAG。然而事实证明,我无法在我的气流 Web 服务器 ui 上单击新创建的 DAG。有关更多上下文,我正在使用 Google Cloud Composer。我已经按照 中提到的步骤进行操作。但是它仍然不适用于我的情况。

这是我的代码

from datetime import datetime, timedelta

from airflow import DAG
import psycopg2
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from psycopg2.extras import NamedTupleCursor
import os

default_args = {
  "owner": "debug",
  "depends_on_past": False,
  "start_date": datetime(2018, 10, 17),
  "email": ["airflow@airflow.com"],
  "email_on_failure": False,
  "email_on_retry": False,
  "retries": 1,
  "retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}


def create_dag(dag_id,
           schedule,
           default_args):
def hello_world_py(*args):
    print 'Hello from DAG: {}'.format(dag_id)

dag = DAG(dag_id,
          schedule_interval=timedelta(days=1),
          default_args=default_args)

with dag:
    t1 = PythonOperator(
        task_id=dag_id,
        python_callable=hello_world_py,
        dag_id=dag_id)

return dag


dag = DAG("dynamic_yolo_pg_", default_args=default_args,     
        schedule_interval=timedelta(hours=1))

"""
Bahavior:
Create an exact DAG which in turn will create it's own file
https://www.astronomer.io/guides/dynamically-generating-dags/
"""
pg_hook = PostgresHook(postgres_conn_id='some_db')
conn = pg_hook.get_conn()
cursor = conn.cursor(cursor_factory=NamedTupleCursor)
cursor.execute("SELECT * FROM airflow_test_command;")
commands = cursor.fetchall()
for command in commands:
  dag_id = command.id
  schedule = timedelta(days=1)

  id = "dynamic_yolo_" + str(dag_id)

  print id

  globals()[id] = create_dag(id,
                           schedule,
                           default_args)

最佳,

这可以通过使用 [1] 中提到的步骤使用自我管理的 Airflow Webserver 来解决。在你这样做之后,如果你决定在你的自我管理的网络服务器前面添加身份验证,一旦你创建了入口,你的 BackendServices 应该出现在 Google IAP 控制台上,你可以启用 IAP。如果您想以编程方式访问您的气流,您还可以使用 JWT 身份验证,使用服务帐户为您的自我管理的气流网络服务器[2]。

[1] https://cloud.google.com/composer/docs/how-to/managing/deploy-webserver

[2] https://cloud.google.com/iap/docs/authentication-howto