Google 云作曲家获取气流 webserver_id

Google cloud composer get airflow webserver_id

我有一个包含作曲家实例 my_project_id_cmpsr_id 的 GCP 项目 my_project_id。为了获得 Airflow rest API 的访问权限,我需要检索所谓的 webserver_id。因此,GCP airflow web 服务器 url 的格式为 {webserver-id}.appspot.com,如 documentation

中指定的
# This should be part of your webserver's URL:
# {tenant-project-id}.appspot.com
webserver_id = 'YOUR-TENANT-PROJECT'

是否可以通过 project_idcomposer_idfg8348538536e2df34-fd 一样检索 webserver_id

有可能,您可以转到 Airflow UI,而不是 Admin -> Configuration,然后搜索 base_url 键,这是您的 webserver-id(没有 https://.appspot.com 部分)。

另一种方法是使用以下命令:

gcloud composer environments describe <ENVIRONMENT_NAME> --location <LOCATION>

您将能够看到 config: -> airflowUri 变量。

希望对您有所帮助。

可以通过运行Google docs上的代码示例得到Python需要的值,然后修改为returnwebserver_id和client_id:

#!/usr/bin/env python
# coding: utf-8

import google.auth
import google.auth.transport.requests
import requests
import six.moves.urllib.parse

project_id = getenv('GCP_PROJECT_ID')
location = getenv('LOCATION')
composer_environment = getenv('COMPOSER_ID')


def get_airflow_details():
    # Authenticate with Google Cloud.
    # See: https://cloud.google.com/docs/authentication/getting-started
    credentials, _ = google.auth.default(
        scopes=['https://www.googleapis.com/auth/cloud-platform'])
    authed_session = google.auth.transport.requests.AuthorizedSession(
        credentials)

    environment_url = (
        'https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
        '/environments/{}').format(project_id, location, composer_environment)
    composer_response = authed_session.request('GET', environment_url)
    environment_data = composer_response.json()
    airflow_uri = environment_data['config']['airflowUri']
    # The Composer environment response does not include the IAP client ID.
    # Make a second, unauthenticated HTTP request to the web server to get the
    # redirect URI.
    redirect_response = requests.get(airflow_uri, allow_redirects=False)
    redirect_location = redirect_response.headers['location']

    # Extract the client_id query parameter from the redirect.
    parsed = six.moves.urllib.parse.urlparse(redirect_location)
    query_string = six.moves.urllib.parse.parse_qs(parsed.query)
    client_id = (query_string['client_id'][0])
    
    return client_id, airflow_uri



def main():

    get_airflow_details()
    
    client_id = (get_airflow_details()[0])
    airflow_uri = (get_airflow_details()[1])
    print(client_id)
    print(airflow_uri)

    return 'Script has run without errors !!'

if (__name__ == "__main__"):
    main()