Airflow Connection 构建凭据

Airflow Connection build credentials

我写了一个 python 脚本,它在命令行中运行良好('my-analytics.json' 文件存储在与脚本相同的文件夹中。现在我将这个脚本移动到 AirFlow(cloud composer ) 我会将此代码移至 PythonOperator。

注意(上下文):此脚本发送 API 请求以从 Google Analytics 中删除用户。

SCOPES = ['https://www.googleapis.com/auth/analytics.user.deletion']
SERVICE_ACCOUNT_FILE = 'my-analytics.json'
credentials = service_account.Credentials.from_service_account_file(
  SERVICE_ACCOUNT_FILE,
  scopes=SCOPES
)
analytics_client = googleapiclient.discovery.build(
  'analytics',
  'v3',
  credentials=credentials
)

user_deletion_request_resource = analytics_client.userDeletion().userDeletionRequest()

def delete_users(id):
    return user_deletion_request_resource.upsert(
    body = {
    "deletionRequestTime": str(datetime.datetime.now()),
    "kind": "analytics#userDeletionRequest",  
    "id": {  
        "userId": id,  
        "type": "CLIENT_ID",  # Type of user (APP_INSTANCE_ID,CLIENT_ID or USER_ID)
    },
    "webPropertyId": "UA-XXXXX-YY"  # Web property ID of the form UA-XXXXX-YY.
    }
    ).execute()

我创建了一个这样的 Google Analytics 连接并将 json 存储到 KeyFile JSON 字段中。

我的问题是如何从该连接建立“凭据”?我不知道如何用 AirFlow 连接替换 SERVICE_ACCOUNT_FILE = 'my-analytics.json'。非常感谢任何 help/guidance。

在 Airflow 中,最好使用钩子和运算符进行编码,这样连接就可以开箱即用,无需所有额外的手动工作。

无论如何,关于你的问题。它类似于答案 here 不同之处在于 Google 连接具有唯一字段,因此您需要使用 GoogleBaseHook。

from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
gcp_hook = GoogleCloudBaseHook(gcp_conn_id="your_conn")
scope = gcp_hook._get_field('scope') # or gcp_hook.scope
keyfile = gcp_hook._get_field('keyfile_dict')
keyfile_path = gcp_hook._get_field('key_path')