数据流作业成功后如何执行云功能?

How to execute a Cloud Function after Dataflow job has succeeded?

我只想在 Dataflow 作业执行成功时触发 Cloud Function。

如果 Dataflow 作业失败,不应触发 Cloud Function。

我是 运行 使用数据流模板(jdbc 到 BigQuery)的数据流作业 UI。

没有选项可以在作业执行后触发任何 Cloud Functions 或其他东西。另外,我无法更改模板代码。云函数的触发方式是什么?

目前还没有为此构建它的功能,但我可以提出一个解决方法。

  • 转到 Cloud Logging 并转到高级过滤器(或新的 UI)
  • 输入这个过滤器
resource.type="dataflow_step"
textPayload="Worker pool stopped."
  • 然后创建一个接收器(操作->在新的UI中创建接收器)
  • 选择 PubSub 作为接收器目的地(为此创建一个新主题)
  • 保存
  • 然后 link 您的 Cloud Functions 的主题(通过对 HTTP 触发的 Cloud Functions 的推送订阅,或通过主题触发的 Cloud Functions。

像这样,每次数据流作业结束时,都会向 PubSub 发布一条新消息,并触发您的 Cloud Function。

您可能会发现这个 feature request 在数据流完成时触发 Cloud Functions 很有用。

可以从命令行确定 Dataflow job 是失败还是成功。您可以列出职位并查看其当前状态,例如查看单个职位,您可以 运行:

gcloud beta dataflow jobs describe <JOB_ID>

JDBC BigQuery 模板 source code can be found on GitHub. You could always create a custom template 如果您需要进行任何特定更改。

我以前也做过类似的事情。

https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/PipelineResult.html

PipelineResult result = pipeline.run();
State s = result.waitUntilFinish();
if (s.compareTo(State.DONE) == 0)
    return callCloudFunction();

然后您可以将您的云函数设置为由 http 请求触发。 https://cloud.google.com/functions/docs/calling/http