使用 gcloud 在气流中创建连接的问题

Problem to create connection in airflow with gcloud

我在 airflow composer usign gcloud 命令中创建连接时遇到问题。问题是当试图将值传递给 extra__google_cloud_platform__keyfile_dict--conn_extra 时。从 dag 访问私有文件的值是错误的。此连接类型为 google_cloud_platform。命令示例的值为:

命令:

gcloud composer environments run COMPOSER --location LOCATION connections -- --add --conn_id=CONNECTION --conn_type=google_cloud_platform --conn_extra="{\"extra__google_cloud_platform__keyfile_dict\":{\"type\":\"service_account\",\"project_id\":\"PROJECT_ID\",\"private_key_id\":\"-----BEGIN PRIVATE KEY-----\VALUE\n-----END PRIVATE KEY-----\n\"}",\"extra__google_cloud_platform__project\":\"PROJECT_ID\",\"extra__google_cloud_platform__scope\":\"SCOPE\"}"

错误日期:

[2021-04-05 17:53:46,046] {base_task_runner.py:113} INFO - Job 12399: Subtask Initial_query   File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_api_base_hook.py", line 216, in _authorize
[2021-04-05 17:53:46,046] {base_task_runner.py:113} INFO - Job 12399: Subtask Initial_query     credentials = self._get_credentials()
[2021-04-05 17:53:46,047] {base_task_runner.py:113} INFO - Job 12399: Subtask Initial_query   File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_api_base_hook.py", line 164, in _get_credentials
[2021-04-05 17:53:46,047] {base_task_runner.py:113} INFO - Job 12399: Subtask Initial_query     keyfile_dict = json.loads(keyfile_dict)
[2021-04-05 17:53:46,047] {base_task_runner.py:113} INFO - Job 12399: Subtask Initial_query   File "/opt/python3.6/lib/python3.6/json/__init__.py", line 348, in loads
[2021-04-05 17:53:46,048] {base_task_runner.py:113} INFO - Job 12399: Subtask Initial_query     'not {!r}'.format(s.__class__.__name__))
[2021-04-05 17:53:46,048] {base_task_runner.py:113} INFO - Job 12399: Subtask Initial_query TypeError: the JSON object must be str, bytes or bytearray, not 'dict

我认为我的转义引号(' 或 ")有问题。我用不同的方式进行了测试,但仍然没有用。最后我可以在这个来源中看到 gcp_api_base_hook。那个 keyfile_dict用于

keyfile_dict = json.loads(keyfile_dict)

不知道怎么回事。我希望你能帮助我。谢谢

已测试解决方案:

gcloud composer environments run [COMPOSER] \
--project [PROJECT_ID] \
--location [LOCATION] connections -- --add --conn_id=[CONN_ID] \
--conn_type=google_cloud_platform \
--conn_extra='{"extra__google_cloud_platform__project":"[PROJECT_ID]","extra__google_cloud_platform__keyfile_dict": "{\"type\":\"service_account\",\"project_id\":\"[PROJECT_ID]\",\"private_key_id\": \"[PRIVATE_KEY_ID]\",
  \"private_key\": \"-----BEGIN PRIVATE KEY-----\nXXX\nXXX\nXXX\n-----END PRIVATE KEY-----\n\",
  \"client_email\":\"[CLIENT_EMAIL]\",\"client_id\":\"[CLIENT_ID]\",\"auth_uri\": \"[AUTH_URI]\",\"token_uri\":\"[TOKEN_URL]\",\"auth_provider_x509_cert_url\":\"[CERT_URL]\",\"client_x509_cert_url\": \"[x509_CERT_URL]\"}","extra__google_cloud_platform__scope":"[SCOPE]"}'

需要说明的是,私钥中的“\n”必须改为“\n”。气流 class 根据内部定义覆盖这些值。