在 Airflow DAG 中调用 Google 云函数
Calling a Google cloud function within Airflow DAG
我有一个正在运行的 google 云函数,我正在尝试从 Airflow DAG 调用它。
到目前为止我尝试过的是使用 SimpleHttpOperator:
MY_TASK_NAME = SimpleHttpOperator(
task_id= "MY_TASK_NAME",
method='POST',
http_conn_id='http_default',
endpoint='https://us-central1-myprojectname.cloudfunctions.net/MyFunctionName',
data=({"schema": schema, "table": table}),
headers={"Content-Type": "application/json"},
xcom_push=False
)
但是查看日志,它说找不到资源:
{base_task_runner.py:98} INFO - Subtask:
The requested URL /<a href="https://us-central1-myprojectname.cloudfunctions.net/MyFunctionName" rel="nofollow noreferrer">https://us-central1-myprojectname.cloudfunctions.net/MyFunctionName</a>
was not found on this server. That’s all we know.
我还注意到它实际上发布到 https://www.google.com/ + 我给的 url:
Sending 'POST' to url: https://www.google.com/https://us-central1-myprojectname.cloudfunctions.net/MyFunctionName
调用函数的正确方法是什么?
谢谢
这是因为您正在使用 http_conn_id='http_default'
。
http_default
连接如下所示:
如果您检查“主机”字段,它会显示 http://www.google.com/
。
创建连接类型为 HTTP
的新连接或修改 http_default
连接并将主机更改为 https://us-central1-myprojectname.cloudfunctions.net/
然后将任务中的 endpoint
字段更新为:
MY_TASK_NAME = SimpleHttpOperator(
task_id= "MY_TASK_NAME",
method='POST',
http_conn_id='http_default',
endpoint='MyFunctionName',
data=({"schema": schema, "table": table}),
headers={"Content-Type": "application/json"},
xcom_push=False
)
编辑:在 URL 末尾添加 /
如@kaxil 所述,您需要先更改 http 连接。然后,您需要能够发送正确的身份验证以调用云功能。下面的 link 有一个分步指南,通过子类化 SimpleHttpOperator
来做到这一点
附带说明一下,Google 应该会使这个过程更加清晰。想要从 Google Cloud Composer 触发 Google Cloud Function (gcf) 是完全合理的。 documentation 关于如何将 http 触发器发送到 gcf 的文档包括 Cloud Scheduler、Cloud Tasks、Cloud Pub/Sub 和许多其他文档,但不包括 Cloud Composer
我有一个正在运行的 google 云函数,我正在尝试从 Airflow DAG 调用它。
到目前为止我尝试过的是使用 SimpleHttpOperator:
MY_TASK_NAME = SimpleHttpOperator(
task_id= "MY_TASK_NAME",
method='POST',
http_conn_id='http_default',
endpoint='https://us-central1-myprojectname.cloudfunctions.net/MyFunctionName',
data=({"schema": schema, "table": table}),
headers={"Content-Type": "application/json"},
xcom_push=False
)
但是查看日志,它说找不到资源:
{base_task_runner.py:98} INFO - Subtask:
The requested URL
/<a href="https://us-central1-myprojectname.cloudfunctions.net/MyFunctionName" rel="nofollow noreferrer">https://us-central1-myprojectname.cloudfunctions.net/MyFunctionName</a>
was not found on this server. That’s all we know.
我还注意到它实际上发布到 https://www.google.com/ + 我给的 url:
Sending 'POST' to url: https://www.google.com/https://us-central1-myprojectname.cloudfunctions.net/MyFunctionName
调用函数的正确方法是什么? 谢谢
这是因为您正在使用 http_conn_id='http_default'
。
http_default
连接如下所示:
如果您检查“主机”字段,它会显示 http://www.google.com/
。
创建连接类型为 HTTP
的新连接或修改 http_default
连接并将主机更改为 https://us-central1-myprojectname.cloudfunctions.net/
然后将任务中的 endpoint
字段更新为:
MY_TASK_NAME = SimpleHttpOperator(
task_id= "MY_TASK_NAME",
method='POST',
http_conn_id='http_default',
endpoint='MyFunctionName',
data=({"schema": schema, "table": table}),
headers={"Content-Type": "application/json"},
xcom_push=False
)
编辑:在 URL 末尾添加 /
如@kaxil 所述,您需要先更改 http 连接。然后,您需要能够发送正确的身份验证以调用云功能。下面的 link 有一个分步指南,通过子类化 SimpleHttpOperator
附带说明一下,Google 应该会使这个过程更加清晰。想要从 Google Cloud Composer 触发 Google Cloud Function (gcf) 是完全合理的。 documentation 关于如何将 http 触发器发送到 gcf 的文档包括 Cloud Scheduler、Cloud Tasks、Cloud Pub/Sub 和许多其他文档,但不包括 Cloud Composer