如何从 google cloud composer 调用云函数?
How to invoke a cloud function from google cloud composer?
根据一项要求,我想 call/invoke 来自云作曲家管道内部的云函数,但我找不到太多关于它的信息,我尝试使用 SimpleHTTP airflow 运算符,但出现此错误:
[2021-09-10 10:35:46,649] {taskinstance.py:1503} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1158, in
_run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1333, in
_prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1363, in
_execute_task
result = task_copy.execute(context=context)
File "/home/airflow/gcs/dags/to_gcf.py", line 51, in execute
if not self.response_check(response):
File "/home/airflow/gcs/dags/to_gcf.py", line 83, in <lambda>
response_check=lambda response: False if len(response.json()) == 0 else True,
File "/opt/python3.8/lib/python3.8/site-packages/requests/models.py", line 900, in json
return complexjson.loads(self.text, **kwargs)
File "/opt/python3.8/lib/python3.8/json/__init__.py", line 357, in loads
return _default_decoder.decode(s)
File "/opt/python3.8/lib/python3.8/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/opt/python3.8/lib/python3.8/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
提前致谢!!
请注意,为了在 1.10 中使用,您需要安装 backport provider 包(但我相信它们是默认安装的)并且操作员的版本可能会略有不同,因为 backport 包有一段时间没有发布时间到了。
在气流 2 中
我遇到了与您相同的问题,但我通过研究 Google 的 Airflow 2.0 提供程序包并改为使用 PythonOperator 设法解决了这个问题。
from airflow.providers.google.common.utils import id_token_credentials as id_token_credential_utils
import google.auth.transport.requests
from google.auth.transport.requests import AuthorizedSession
def invoke_cloud_function():
url = "<your_cloud_function_url>" #the url is also the target audience.
request = google.auth.transport.requests.Request() #this is a request for obtaining the the credentials
id_token_credentials = id_token_credential_utils.get_default_id_token_credentials(url, request=request) # If your cloud function url has query parameters, remove them before passing to the audience
resp = AuthorizedSession(id_token_credentials).request("GET", url=url) # the authorized session object is used to access the Cloud Function
print(resp.status_code) # should return 200
print(resp.content) # the body of the HTTP response
因此,调用函数如下:
task = PythonOperator(task_id="invoke_cf", python_callable=invoke_cloud_function)
据我了解,访问经过身份验证的 HTTP Cloud Function 严格需要基于 ID 令牌的凭据。因此,为了获得所需的 type 凭据,get_default_id_token_credentials()
执行应用程序默认凭据 (ADC) 授权流程,这是一个从环境变量、已知位置获取凭据的过程。或 Compute Engine 元数据服务器。 Composer 应该通过环境变量(可能 GOOGLE_APPLICATION_CREDENTIALS
)使关联的服务帐户密钥文件可用。
获得正确类型的凭据后,您可以使用 AuthorizedSessions 对象验证您对云函数的请求。
给定一个在 HTTP 模式下工作的 Cloud Function,最好和最简单的解决方案是使用 SimpleHttpOperator
.
调用它
如果云函数需要身份验证,您需要生成身份验证令牌并将其插入header:
import os
import json
import google.oauth2.id_token
import google.auth.transport.requests
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = './my-service-account.json'
request = google.auth.transport.requests.Request()
audience = 'https://mylocation-myprojectname.cloudfunctions.net/MyFunctionName'
TOKEN = google.oauth2.id_token.fetch_id_token(request, audience)
MY_TASK_NAME = SimpleHttpOperator(
task_id= 'MY_TASK_NAME',
method='POST',
http_conn_id='my_connection_id',
endpoint='MyFunctionName',
headers={'Authorization': f"Bearer {TOKEN}", "Content-Type": "application/json"},
data=json.dumps({"key": "value"}), # possible request parameters
# dag=dag
)
如果无需身份验证即可触发云功能:
import os
import json
MY_TASK_NAME = SimpleHttpOperator(
task_id= 'MY_TASK_NAME',
method='POST',
http_conn_id='my_connection_id',
endpoint='MyFunctionName',
headers={"Content-Type": "application/json"},
data=json.dumps({"key": "value"}), # possible request parameters
# dag=dag
)
在这两种情况下,请记住在气流连接菜单(管理 --> 连接)中设置 my_connection_id
。下图就是一个例子:
您还可以使用 requests
python 模块触发 Cloud Function(与之前一样使用或 w/o 身份验证):
import os
import json
import requests
import google.oauth2.id_token
import google.auth.transport.requests
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = './my-service-account.json'
request = google.auth.transport.requests.Request()
audience = 'https://mylocation-myprojectname.cloudfunctions.net/MyFunctionName'
TOKEN = google.oauth2.id_token.fetch_id_token(request, audience)
r = requests.post(
'https://mylocation-myprojectname.cloudfunctions.net/MyFunctionName',
headers={'Authorization': f"Bearer {TOKEN}", "Content-Type": "application/json"},
data=json.dumps({"key": "value"}) # possible request parameters
)
r.status_code, r.reason
根据一项要求,我想 call/invoke 来自云作曲家管道内部的云函数,但我找不到太多关于它的信息,我尝试使用 SimpleHTTP airflow 运算符,但出现此错误:
[2021-09-10 10:35:46,649] {taskinstance.py:1503} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1158, in
_run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1333, in
_prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1363, in
_execute_task
result = task_copy.execute(context=context)
File "/home/airflow/gcs/dags/to_gcf.py", line 51, in execute
if not self.response_check(response):
File "/home/airflow/gcs/dags/to_gcf.py", line 83, in <lambda>
response_check=lambda response: False if len(response.json()) == 0 else True,
File "/opt/python3.8/lib/python3.8/site-packages/requests/models.py", line 900, in json
return complexjson.loads(self.text, **kwargs)
File "/opt/python3.8/lib/python3.8/json/__init__.py", line 357, in loads
return _default_decoder.decode(s)
File "/opt/python3.8/lib/python3.8/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/opt/python3.8/lib/python3.8/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
提前致谢!!
请注意,为了在 1.10 中使用,您需要安装 backport provider 包(但我相信它们是默认安装的)并且操作员的版本可能会略有不同,因为 backport 包有一段时间没有发布时间到了。
在气流 2 中
我遇到了与您相同的问题,但我通过研究 Google 的 Airflow 2.0 提供程序包并改为使用 PythonOperator 设法解决了这个问题。
from airflow.providers.google.common.utils import id_token_credentials as id_token_credential_utils
import google.auth.transport.requests
from google.auth.transport.requests import AuthorizedSession
def invoke_cloud_function():
url = "<your_cloud_function_url>" #the url is also the target audience.
request = google.auth.transport.requests.Request() #this is a request for obtaining the the credentials
id_token_credentials = id_token_credential_utils.get_default_id_token_credentials(url, request=request) # If your cloud function url has query parameters, remove them before passing to the audience
resp = AuthorizedSession(id_token_credentials).request("GET", url=url) # the authorized session object is used to access the Cloud Function
print(resp.status_code) # should return 200
print(resp.content) # the body of the HTTP response
因此,调用函数如下:
task = PythonOperator(task_id="invoke_cf", python_callable=invoke_cloud_function)
据我了解,访问经过身份验证的 HTTP Cloud Function 严格需要基于 ID 令牌的凭据。因此,为了获得所需的 type 凭据,get_default_id_token_credentials()
执行应用程序默认凭据 (ADC) 授权流程,这是一个从环境变量、已知位置获取凭据的过程。或 Compute Engine 元数据服务器。 Composer 应该通过环境变量(可能 GOOGLE_APPLICATION_CREDENTIALS
)使关联的服务帐户密钥文件可用。
获得正确类型的凭据后,您可以使用 AuthorizedSessions 对象验证您对云函数的请求。
给定一个在 HTTP 模式下工作的 Cloud Function,最好和最简单的解决方案是使用 SimpleHttpOperator
.
如果云函数需要身份验证,您需要生成身份验证令牌并将其插入header:
import os
import json
import google.oauth2.id_token
import google.auth.transport.requests
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = './my-service-account.json'
request = google.auth.transport.requests.Request()
audience = 'https://mylocation-myprojectname.cloudfunctions.net/MyFunctionName'
TOKEN = google.oauth2.id_token.fetch_id_token(request, audience)
MY_TASK_NAME = SimpleHttpOperator(
task_id= 'MY_TASK_NAME',
method='POST',
http_conn_id='my_connection_id',
endpoint='MyFunctionName',
headers={'Authorization': f"Bearer {TOKEN}", "Content-Type": "application/json"},
data=json.dumps({"key": "value"}), # possible request parameters
# dag=dag
)
如果无需身份验证即可触发云功能:
import os
import json
MY_TASK_NAME = SimpleHttpOperator(
task_id= 'MY_TASK_NAME',
method='POST',
http_conn_id='my_connection_id',
endpoint='MyFunctionName',
headers={"Content-Type": "application/json"},
data=json.dumps({"key": "value"}), # possible request parameters
# dag=dag
)
在这两种情况下,请记住在气流连接菜单(管理 --> 连接)中设置 my_connection_id
。下图就是一个例子:
您还可以使用 requests
python 模块触发 Cloud Function(与之前一样使用或 w/o 身份验证):
import os
import json
import requests
import google.oauth2.id_token
import google.auth.transport.requests
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = './my-service-account.json'
request = google.auth.transport.requests.Request()
audience = 'https://mylocation-myprojectname.cloudfunctions.net/MyFunctionName'
TOKEN = google.oauth2.id_token.fetch_id_token(request, audience)
r = requests.post(
'https://mylocation-myprojectname.cloudfunctions.net/MyFunctionName',
headers={'Authorization': f"Bearer {TOKEN}", "Content-Type": "application/json"},
data=json.dumps({"key": "value"}) # possible request parameters
)
r.status_code, r.reason