在 Airflow DAG 中调用 Google 云函数

Calling a Google cloud function within Airflow DAG

我有一个正在运行的 google 云函数,我正在尝试从 Airflow DAG 调用它。

到目前为止我尝试过的是使用 SimpleHttpOperator:

MY_TASK_NAME = SimpleHttpOperator(
        task_id= "MY_TASK_NAME",
        method='POST',
        http_conn_id='http_default',
        endpoint='https://us-central1-myprojectname.cloudfunctions.net/MyFunctionName',
        data=({"schema": schema, "table": table}),
        headers={"Content-Type": "application/json"},
        xcom_push=False
    )

但是查看日志,它说找不到资源:

{base_task_runner.py:98} INFO - Subtask:

The requested URL /<a href="https://us-central1-myprojectname.cloudfunctions.net/MyFunctionName" rel="nofollow noreferrer">https://us-central1-myprojectname.cloudfunctions.net/MyFunctionName</a> was not found on this server. That’s all we know.

我还注意到它实际上发布到 https://www.google.com/ + 我给的 url:

Sending 'POST' to url: https://www.google.com/https://us-central1-myprojectname.cloudfunctions.net/MyFunctionName

调用函数的正确方法是什么? 谢谢

这是因为您正在使用 http_conn_id='http_default'

http_default 连接如下所示:

如果您检查“主机”字段,它会显示 http://www.google.com/

创建连接类型为 HTTP 的新连接或修改 http_default 连接并将主机更改为 https://us-central1-myprojectname.cloudfunctions.net/

然后将任务中的 endpoint 字段更新为:

MY_TASK_NAME = SimpleHttpOperator(
        task_id= "MY_TASK_NAME",
        method='POST',
        http_conn_id='http_default',
        endpoint='MyFunctionName',
        data=({"schema": schema, "table": table}),
        headers={"Content-Type": "application/json"},
        xcom_push=False
    )

编辑:在 URL 末尾添加 /

如@kaxil 所述,您需要先更改 http 连接。然后,您需要能够发送正确的身份验证以调用云功能。下面的 link 有一个分步指南,通过子类化 SimpleHttpOperator

来做到这一点

https://medium.com/google-cloud/calling-cloud-composer-to-cloud-functions-and-back-again-securely-8e65d783acce


附带说明一下,Google 应该会使这个过程更加清晰。想要从 Google Cloud Composer 触发 Google Cloud Function (gcf) 是完全合理的。 documentation 关于如何将 http 触发器发送到 gcf 的文档包括 Cloud Scheduler、Cloud Tasks、Cloud Pub/Sub 和许多其他文档,但不包括 Cloud Composer