如何在 Google 云平台上将数据库(postgres)连接到 Airflow composer?

How to Connect Database(postgres) to Airflow composer On Google Cloud Platform?

我在我的本地 machine.Dags 上设置了气流,它们以需要访问数据库(postgres)的方式编写。我正在尝试在 Google Cloud [=26= 上设置类似的东西] 我无法在 composer.I 时将数据库连接到 Airflow ??

这是 Link 我完整的 Airflow 文件夹:(这个设置在我的本地机器上工作正常docker)

https://github.com/digvijay13873/airflow-docker.git

我正在使用 GCP composer.Postgres 数据库在 SQL 实例中。我的 Table 创作 Dag 在这里: https://github.com/digvijay13873/airflow-docker/blob/main/dags/tablecreation.py

我应该在我现有的 Dag 中做哪些更改以将其与 SQL 实例中的 postgres 连接。我尝试在主机参数中提供 public postgres 的 IP 地址。

回答您的主要问题,可以通过两种方式在 Cloud Composer 环境中连接来自 GCP 的 SQL 实例:

  • 使用 Public IP
  • 使用云 SQL 代理(推荐):无需授权网络和 SSL 配置即可安全访问

正在使用 Public IP 连接: Postgres:通过 TCP 直接连接 (non-SSL)

os.environ['AIRFLOW_CONN_PUBLIC_POSTGRES_TCP'] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=postgres&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=False".format(**postgres_kwargs)
)

有关详细信息,请参阅 github

使用云 SQL 代理进行连接: 您可以根据 this 文档使用来自 GKE 的 Auth 代理进行连接。

设置 SQL 代理后,您可以使用代理将 Composer 连接到您的 SQL 实例。

示例代码:

SQL = [
    'CREATE TABLE IF NOT EXISTS TABLE_TEST (I INTEGER)',
    'CREATE TABLE IF NOT EXISTS TABLE_TEST (I INTEGER)',
    'INSERT INTO TABLE_TEST VALUES (0)',
    'CREATE TABLE IF NOT EXISTS TABLE_TEST2 (I INTEGER)',
    'DROP TABLE TABLE_TEST',
    'DROP TABLE TABLE_TEST2',
]

HOME_DIR = expanduser("~")
def get_absolute_path(path):
    if path.startswith("/"):
        return path
    else:
        return os.path.join(HOME_DIR, path)
postgres_kwargs = dict(
    user=quote_plus(GCSQL_POSTGRES_USER),
    password=quote_plus(GCSQL_POSTGRES_PASSWORD),
    public_port=GCSQL_POSTGRES_PUBLIC_PORT,
    public_ip=quote_plus(GCSQL_POSTGRES_PUBLIC_IP),
    project_id=quote_plus(GCP_PROJECT_ID),
    location=quote_plus(GCP_REGION),
    instance=quote_plus(GCSQL_POSTGRES_INSTANCE_NAME_QUERY),
    database=quote_plus(GCSQL_POSTGRES_DATABASE_NAME),
)

os.environ['AIRFLOW_CONN_PROXY_POSTGRES_TCP'] = \
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
    "database_type=postgres&" \
    "project_id={project_id}&" \
    "location={location}&" \
    "instance={instance}&" \
    "use_proxy=True&" \
    "sql_proxy_use_tcp=True".format(**postgres_kwargs)
connection_names = [
    "proxy_postgres_tcp",
]

dag = DAG(
    'con_SQL',
    default_args=default_args,
    description='A DAG that connect to the SQL server.',
    schedule_interval=timedelta(days=1),
)
def print_client(ds, **kwargs):
    client = storage.Client()
    print(client)
print_task = PythonOperator(
    task_id='print_the_client',
    provide_context=True,
    python_callable=print_client,
    dag=dag,
)
for connection_name in connection_names:
    task = CloudSqlQueryOperator(
         gcp_cloudsql_conn_id=connection_name,
         task_id="example_gcp_sql_task_" + connection_name,
         sql=SQL,
         dag=dag
    )
print_task >> task