编排小型 python 任务的最佳实践(主要是在 BigQuery 中执行 SQL)

Best practise to orchestrate small python task (mostly executing SQL in BigQuery)

我们在 GCP 中使用 pubsub 和云功能来协调我们的数据工作流。

我们的工作流程是这样的:

workflow_gcp

pubsub1 和 pubsub3 可以在不同时间触发(例如:凌晨 1 点和凌晨 4 点)。它们每天从外部源(我们的 ETL、Talend)触发。

我们的云函数基本上在 BigQuery 中执行 SQL。

这运行良好,但我们必须手动创建一个编排数据库来记录功能开始和结束的时间(以回答问题 "function X executed ok?")。编排逻辑与我们的业务逻辑强耦合,因为我们的云功能必须知道之前必须执行哪些功能,以及之后触发什么 pubsub。

因此我们正在寻找一种将编排逻辑和业务逻辑分开的解决方案。

我发现 composer (airflow) 可能是一个解决方案,但是:

那么在我们的案例中最佳实践是什么?

感谢您的帮助

您可以使用 Cloud Composer (Airflow) 并仍然重新利用大部分现有设置。

首先,您可以保留所有现有的 Cloud Functions 并在 Airflow 中使用 HTTP triggers (or others you prefer) to trigger them in Airflow. The only change you will need to do is to implement a PubSub Sensor,因此它会触发您的 Cloud Functions(从而确保您可以控制整个过程的编排)。

您的解决方案将是一个 Airflow DAG,它根据 PubSub 消息触发 Cloud Functions,如果函数成功则向 Airflow 报告,然后,如果两者都成功,则触发第三个 Cloud Functions一个 HTTP 触发器或类似的东西,都是一样的。

最后一点,不是很直观。 Airflow 并不意味着 运行 作业本身,它旨在协调和管理依赖项。您使用由 Airflow 触发的 Cloud Functions 这一事实并不是反模式,实际上是一种最佳实践。

在您的情况下,您可以 100% 重写一些内容并使用 BigQuery 运算符,因为您不做任何处理,只是触发 queries/jobs,但概念仍然正确,最佳实践正在利用 Airflow 确保事情按您需要的时间和顺序发生,而不是自己处理这些事情。 (希望有意义)

作为气流的替代方案,我会查看 "argo workflows" -> https://github.com/argoproj/argo

它没有作曲家的成本开销,尤其是对于较小的工作量。

我会:

创建了一个从外部工具读取 pubsub 消息并将其部署到 kubernetes 的部署。

基于消息执行了一个工作流。工作流中的每个步骤都可以是一个云函数,打包在 docker.

(我会用 kubernetes 作业替换云函数,然后由工作流触发。)

在 kuberentes 中用 docker 和 运行 打包一个云函数非常简单。

存在带有 gsutil/bq/gcloud 的预构建 docker 图像,因此您可以创建 bash 使用 "bq" 命令行执行 bigquery 中内容的脚本。