如何在 Google 云平台上将数据库(postgres)连接到 Airflow composer?
How to Connect Database(postgres) to Airflow composer On Google Cloud Platform?
我在我的本地 machine.Dags 上设置了气流,它们以需要访问数据库(postgres)的方式编写。我正在尝试在 Google Cloud [=26= 上设置类似的东西] 我无法在 composer.I 时将数据库连接到 Airflow ??
这是 Link 我完整的 Airflow 文件夹:(这个设置在我的本地机器上工作正常docker)
https://github.com/digvijay13873/airflow-docker.git
我正在使用 GCP composer.Postgres 数据库在 SQL 实例中。我的 Table 创作 Dag 在这里:
https://github.com/digvijay13873/airflow-docker/blob/main/dags/tablecreation.py
我应该在我现有的 Dag 中做哪些更改以将其与 SQL 实例中的 postgres 连接。我尝试在主机参数中提供 public postgres 的 IP 地址。
回答您的主要问题,可以通过两种方式在 Cloud Composer 环境中连接来自 GCP 的 SQL 实例:
- 使用 Public IP
- 使用云 SQL 代理(推荐):无需授权网络和 SSL 配置即可安全访问
正在使用 Public IP 连接:
Postgres:通过 TCP 直接连接 (non-SSL)
os.environ['AIRFLOW_CONN_PUBLIC_POSTGRES_TCP'] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=postgres&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=False&"
"use_ssl=False".format(**postgres_kwargs)
)
有关详细信息,请参阅 github
使用云 SQL 代理进行连接: 您可以根据 this 文档使用来自 GKE 的 Auth 代理进行连接。
设置 SQL 代理后,您可以使用代理将 Composer 连接到您的 SQL 实例。
示例代码:
SQL = [
'CREATE TABLE IF NOT EXISTS TABLE_TEST (I INTEGER)',
'CREATE TABLE IF NOT EXISTS TABLE_TEST (I INTEGER)',
'INSERT INTO TABLE_TEST VALUES (0)',
'CREATE TABLE IF NOT EXISTS TABLE_TEST2 (I INTEGER)',
'DROP TABLE TABLE_TEST',
'DROP TABLE TABLE_TEST2',
]
HOME_DIR = expanduser("~")
def get_absolute_path(path):
if path.startswith("/"):
return path
else:
return os.path.join(HOME_DIR, path)
postgres_kwargs = dict(
user=quote_plus(GCSQL_POSTGRES_USER),
password=quote_plus(GCSQL_POSTGRES_PASSWORD),
public_port=GCSQL_POSTGRES_PUBLIC_PORT,
public_ip=quote_plus(GCSQL_POSTGRES_PUBLIC_IP),
project_id=quote_plus(GCP_PROJECT_ID),
location=quote_plus(GCP_REGION),
instance=quote_plus(GCSQL_POSTGRES_INSTANCE_NAME_QUERY),
database=quote_plus(GCSQL_POSTGRES_DATABASE_NAME),
)
os.environ['AIRFLOW_CONN_PROXY_POSTGRES_TCP'] = \
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
"database_type=postgres&" \
"project_id={project_id}&" \
"location={location}&" \
"instance={instance}&" \
"use_proxy=True&" \
"sql_proxy_use_tcp=True".format(**postgres_kwargs)
connection_names = [
"proxy_postgres_tcp",
]
dag = DAG(
'con_SQL',
default_args=default_args,
description='A DAG that connect to the SQL server.',
schedule_interval=timedelta(days=1),
)
def print_client(ds, **kwargs):
client = storage.Client()
print(client)
print_task = PythonOperator(
task_id='print_the_client',
provide_context=True,
python_callable=print_client,
dag=dag,
)
for connection_name in connection_names:
task = CloudSqlQueryOperator(
gcp_cloudsql_conn_id=connection_name,
task_id="example_gcp_sql_task_" + connection_name,
sql=SQL,
dag=dag
)
print_task >> task
我在我的本地 machine.Dags 上设置了气流,它们以需要访问数据库(postgres)的方式编写。我正在尝试在 Google Cloud [=26= 上设置类似的东西] 我无法在 composer.I 时将数据库连接到 Airflow ??
这是 Link 我完整的 Airflow 文件夹:(这个设置在我的本地机器上工作正常docker)
https://github.com/digvijay13873/airflow-docker.git
我正在使用 GCP composer.Postgres 数据库在 SQL 实例中。我的 Table 创作 Dag 在这里: https://github.com/digvijay13873/airflow-docker/blob/main/dags/tablecreation.py
我应该在我现有的 Dag 中做哪些更改以将其与 SQL 实例中的 postgres 连接。我尝试在主机参数中提供 public postgres 的 IP 地址。
回答您的主要问题,可以通过两种方式在 Cloud Composer 环境中连接来自 GCP 的 SQL 实例:
- 使用 Public IP
- 使用云 SQL 代理(推荐):无需授权网络和 SSL 配置即可安全访问
正在使用 Public IP 连接: Postgres:通过 TCP 直接连接 (non-SSL)
os.environ['AIRFLOW_CONN_PUBLIC_POSTGRES_TCP'] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=postgres&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=False&"
"use_ssl=False".format(**postgres_kwargs)
)
有关详细信息,请参阅 github
使用云 SQL 代理进行连接: 您可以根据 this 文档使用来自 GKE 的 Auth 代理进行连接。
设置 SQL 代理后,您可以使用代理将 Composer 连接到您的 SQL 实例。
示例代码:
SQL = [
'CREATE TABLE IF NOT EXISTS TABLE_TEST (I INTEGER)',
'CREATE TABLE IF NOT EXISTS TABLE_TEST (I INTEGER)',
'INSERT INTO TABLE_TEST VALUES (0)',
'CREATE TABLE IF NOT EXISTS TABLE_TEST2 (I INTEGER)',
'DROP TABLE TABLE_TEST',
'DROP TABLE TABLE_TEST2',
]
HOME_DIR = expanduser("~")
def get_absolute_path(path):
if path.startswith("/"):
return path
else:
return os.path.join(HOME_DIR, path)
postgres_kwargs = dict(
user=quote_plus(GCSQL_POSTGRES_USER),
password=quote_plus(GCSQL_POSTGRES_PASSWORD),
public_port=GCSQL_POSTGRES_PUBLIC_PORT,
public_ip=quote_plus(GCSQL_POSTGRES_PUBLIC_IP),
project_id=quote_plus(GCP_PROJECT_ID),
location=quote_plus(GCP_REGION),
instance=quote_plus(GCSQL_POSTGRES_INSTANCE_NAME_QUERY),
database=quote_plus(GCSQL_POSTGRES_DATABASE_NAME),
)
os.environ['AIRFLOW_CONN_PROXY_POSTGRES_TCP'] = \
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
"database_type=postgres&" \
"project_id={project_id}&" \
"location={location}&" \
"instance={instance}&" \
"use_proxy=True&" \
"sql_proxy_use_tcp=True".format(**postgres_kwargs)
connection_names = [
"proxy_postgres_tcp",
]
dag = DAG(
'con_SQL',
default_args=default_args,
description='A DAG that connect to the SQL server.',
schedule_interval=timedelta(days=1),
)
def print_client(ds, **kwargs):
client = storage.Client()
print(client)
print_task = PythonOperator(
task_id='print_the_client',
provide_context=True,
python_callable=print_client,
dag=dag,
)
for connection_name in connection_names:
task = CloudSqlQueryOperator(
gcp_cloudsql_conn_id=connection_name,
task_id="example_gcp_sql_task_" + connection_name,
sql=SQL,
dag=dag
)
print_task >> task