Airflow Connection 构建凭据
Airflow Connection build credentials
我写了一个 python 脚本,它在命令行中运行良好('my-analytics.json' 文件存储在与脚本相同的文件夹中。现在我将这个脚本移动到 AirFlow(cloud composer ) 我会将此代码移至 PythonOperator。
注意(上下文):此脚本发送 API 请求以从 Google Analytics 中删除用户。
SCOPES = ['https://www.googleapis.com/auth/analytics.user.deletion']
SERVICE_ACCOUNT_FILE = 'my-analytics.json'
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE,
scopes=SCOPES
)
analytics_client = googleapiclient.discovery.build(
'analytics',
'v3',
credentials=credentials
)
user_deletion_request_resource = analytics_client.userDeletion().userDeletionRequest()
def delete_users(id):
return user_deletion_request_resource.upsert(
body = {
"deletionRequestTime": str(datetime.datetime.now()),
"kind": "analytics#userDeletionRequest",
"id": {
"userId": id,
"type": "CLIENT_ID", # Type of user (APP_INSTANCE_ID,CLIENT_ID or USER_ID)
},
"webPropertyId": "UA-XXXXX-YY" # Web property ID of the form UA-XXXXX-YY.
}
).execute()
我创建了一个这样的 Google Analytics 连接并将 json 存储到 KeyFile JSON 字段中。
我的问题是如何从该连接建立“凭据”?我不知道如何用 AirFlow 连接替换 SERVICE_ACCOUNT_FILE = 'my-analytics.json'
。非常感谢任何 help/guidance。
在 Airflow 中,最好使用钩子和运算符进行编码,这样连接就可以开箱即用,无需所有额外的手动工作。
无论如何,关于你的问题。它类似于答案 here
不同之处在于 Google 连接具有唯一字段,因此您需要使用 GoogleBaseHook。
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
gcp_hook = GoogleCloudBaseHook(gcp_conn_id="your_conn")
scope = gcp_hook._get_field('scope') # or gcp_hook.scope
keyfile = gcp_hook._get_field('keyfile_dict')
keyfile_path = gcp_hook._get_field('key_path')
我写了一个 python 脚本,它在命令行中运行良好('my-analytics.json' 文件存储在与脚本相同的文件夹中。现在我将这个脚本移动到 AirFlow(cloud composer ) 我会将此代码移至 PythonOperator。
注意(上下文):此脚本发送 API 请求以从 Google Analytics 中删除用户。
SCOPES = ['https://www.googleapis.com/auth/analytics.user.deletion']
SERVICE_ACCOUNT_FILE = 'my-analytics.json'
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE,
scopes=SCOPES
)
analytics_client = googleapiclient.discovery.build(
'analytics',
'v3',
credentials=credentials
)
user_deletion_request_resource = analytics_client.userDeletion().userDeletionRequest()
def delete_users(id):
return user_deletion_request_resource.upsert(
body = {
"deletionRequestTime": str(datetime.datetime.now()),
"kind": "analytics#userDeletionRequest",
"id": {
"userId": id,
"type": "CLIENT_ID", # Type of user (APP_INSTANCE_ID,CLIENT_ID or USER_ID)
},
"webPropertyId": "UA-XXXXX-YY" # Web property ID of the form UA-XXXXX-YY.
}
).execute()
我创建了一个这样的 Google Analytics 连接并将 json 存储到 KeyFile JSON 字段中。
我的问题是如何从该连接建立“凭据”?我不知道如何用 AirFlow 连接替换 SERVICE_ACCOUNT_FILE = 'my-analytics.json'
。非常感谢任何 help/guidance。
在 Airflow 中,最好使用钩子和运算符进行编码,这样连接就可以开箱即用,无需所有额外的手动工作。
无论如何,关于你的问题。它类似于答案 here 不同之处在于 Google 连接具有唯一字段,因此您需要使用 GoogleBaseHook。
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
gcp_hook = GoogleCloudBaseHook(gcp_conn_id="your_conn")
scope = gcp_hook._get_field('scope') # or gcp_hook.scope
keyfile = gcp_hook._get_field('keyfile_dict')
keyfile_path = gcp_hook._get_field('key_path')