使用 Java/Scala 代码获取数据流作业状态

Get dataflow job status using Java/Scala code

我正在尝试以编程方式找出数据流作业的状态,同时 运行。例如,我想轮询批处理/流式处理作业,一旦完成,我想触发下一个作业。我可以利用 Java/Scala API 来完成此任务吗?我已经尝试使用 com.google.api.services.dataflow.Dataflow$Projects$Locations$Jobs 并且能够检索 JobMetrics,但我正在尝试获取更多信息,例如作业输入和输出指标,以确定流作业是否已处理所有数据。

[已编辑 - 基于后续问题]。 这些是我在 运行:

时尝试从数据流作业中获取的信号

While checking manually the streaming jobs, we conclude that the job is "done" when we see that the throughput has come down to zero and hasn't changed for a few minutes. Would like to do the same while automating this approach.

有什么可以帮助我的吗?

抱歉,我是 Java 文盲,但关于您需要的详细信息,我可以向您指出 Dataflow API 中的哪个端点来获取它们。我的示例是通过使用 curl 向数据流 API 发送 HTTP 请求来完成的。

  1. 作业状态(运行、完成、失败)

    • 使用 Jobs endpoint, you can submit a request and it should return the currentState. See currentState 查看作业可用的所有状态。这包括 JOB_STATE_DONE、JOB_STATE_RUNNING、JOB_STATE_FAILED 等
    • 这可以用于批处理和流处理。
    • 在Java中如果我没记错的话你可以使用Job class and method getCurrentState
    • 示例:

  2. 摄取/写入的元素

    • 您提到您已经检索到 JobMetrics。
    • JobMetrics endpoint returns lists of MetricStructuredName objects。这些对象包含生成指标的源。您可以在“步数”中找到您的指标。
    • 例如,如果您有一个 ReadFromPubSub 步骤,您可以找到它读取的消息数。请参阅下面的屏幕截图,了解对象的外观以及它在 UI.
    • 上的外观
    • 您只需要适当地过滤掉数据,就可以看到您需要的详细信息。
    • 在Java中应该来自JobMetrics class and method getMetrics
    • 示例(scalar包含“添加的元素”的值值不一样,因为它是连续流式传输的):
  3. 数据新鲜度

    • 无法使用数据流提取数据新鲜度 API。
    • 数据是使用 Cloud Monitoring 创建的,如果您需要此数据,则需要学习 Monitoring API。请参阅 sample codes 了解如何使用 API。
    • 要查看创建的指标,请转至 GCP Console > Monitoring > Dashboards > GCP > Dataflow
    • 您应该会看到 Dataflow 作业及其指标的列表。

我希望这些信息可以为您指明正确的方向,并能够使用 Java 来实现它。