如何从 google cloud composer 调用云函数?

How to invoke a cloud function from google cloud composer?

根据一项要求,我想 call/invoke 来自云作曲家管道内部的云函数,但我找不到太多关于它的信息,我尝试使用 SimpleHTTP airflow 运算符,但出现此错误:

[2021-09-10 10:35:46,649] {taskinstance.py:1503} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1158, in 
_run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1333, in 
_prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1363, in 
_execute_task
result = task_copy.execute(context=context)
File "/home/airflow/gcs/dags/to_gcf.py", line 51, in execute
if not self.response_check(response):
File "/home/airflow/gcs/dags/to_gcf.py", line 83, in <lambda>
response_check=lambda response: False if len(response.json()) == 0 else True,
File "/opt/python3.8/lib/python3.8/site-packages/requests/models.py", line 900, in json
return complexjson.loads(self.text, **kwargs)
File "/opt/python3.8/lib/python3.8/json/__init__.py", line 357, in loads
return _default_decoder.decode(s)
File "/opt/python3.8/lib/python3.8/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/opt/python3.8/lib/python3.8/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None

提前致谢!!

我想你在找:https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/functions/index.html#airflow.providers.google.cloud.operators.functions.CloudFunctionInvokeFunctionOperator

请注意,为了在 1.10 中使用,您需要安装 backport provider 包(但我相信它们是默认安装的)并且操作员的版本可能会略有不同,因为 backport 包有一段时间没有发布时间到了。

在气流 2 中

我遇到了与您相同的问题,但我通过研究 Google 的 Airflow 2.0 提供程序包并改为使用 PythonOperator 设法解决了这个问题。

from airflow.providers.google.common.utils import id_token_credentials as id_token_credential_utils
import google.auth.transport.requests
from google.auth.transport.requests import AuthorizedSession

def invoke_cloud_function():

  url = "<your_cloud_function_url>" #the url is also the target audience. 
  request = google.auth.transport.requests.Request()  #this is a request for obtaining the the credentials
  id_token_credentials = id_token_credential_utils.get_default_id_token_credentials(url, request=request) # If your cloud function url has query parameters, remove them before passing to the audience 

  resp = AuthorizedSession(id_token_credentials).request("GET", url=url) # the authorized session object is used to access the Cloud Function

  print(resp.status_code) # should return 200
  print(resp.content) # the body of the HTTP response

因此,调用函数如下:

    task = PythonOperator(task_id="invoke_cf", python_callable=invoke_cloud_function)

据我了解,访问经过身份验证的 HTTP Cloud Function 严格需要基于 ID 令牌的凭据。因此,为了获得所需的 type 凭据,get_default_id_token_credentials() 执行应用程序默认凭据 (ADC) 授权流程,这是一个从环境变量、已知位置获取凭据的过程。或 Compute Engine 元数据服务器。 Composer 应该通过环境变量(可能 GOOGLE_APPLICATION_CREDENTIALS)使关联的服务帐户密钥文件可用。

获得正确类型的凭据后,您可以使用 AuthorizedSessions 对象验证您对云函数的请求。

给定一个在 HTTP 模式下工作的 Cloud Function,最好和最简单的解决方案是使用 SimpleHttpOperator.

调用它

如果云函数需要身份验证,您需要生成身份验证令牌并将其插入header:

import os
import json
import google.oauth2.id_token
import google.auth.transport.requests

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = './my-service-account.json'
request = google.auth.transport.requests.Request()
audience = 'https://mylocation-myprojectname.cloudfunctions.net/MyFunctionName'
TOKEN = google.oauth2.id_token.fetch_id_token(request, audience)

MY_TASK_NAME = SimpleHttpOperator(
        task_id= 'MY_TASK_NAME',
        method='POST',
        http_conn_id='my_connection_id',
        endpoint='MyFunctionName',
        headers={'Authorization': f"Bearer {TOKEN}", "Content-Type": "application/json"},
        data=json.dumps({"key": "value"}),  # possible request parameters
        # dag=dag
    )

如果无需身份验证即可触发云功能:

import os
import json

MY_TASK_NAME = SimpleHttpOperator(
        task_id= 'MY_TASK_NAME',
        method='POST',
        http_conn_id='my_connection_id',
        endpoint='MyFunctionName',
        headers={"Content-Type": "application/json"},
        data=json.dumps({"key": "value"}),  # possible request parameters
        # dag=dag
    )

在这两种情况下,请记住在气流连接菜单(管理 --> 连接)中设置 my_connection_id。下图就是一个例子:

您还可以使用 requests python 模块触发 Cloud Function(与之前一样使用或 w/o 身份验证):

import os
import json
import requests
import google.oauth2.id_token
import google.auth.transport.requests

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = './my-service-account.json'
request = google.auth.transport.requests.Request()
audience = 'https://mylocation-myprojectname.cloudfunctions.net/MyFunctionName'
TOKEN = google.oauth2.id_token.fetch_id_token(request, audience)

r = requests.post(
    'https://mylocation-myprojectname.cloudfunctions.net/MyFunctionName', 
    headers={'Authorization': f"Bearer {TOKEN}", "Content-Type": "application/json"},
    data=json.dumps({"key": "value"})  # possible request parameters
)
r.status_code, r.reason