使用 Java/Scala 代码获取数据流作业状态
Get dataflow job status using Java/Scala code
我正在尝试以编程方式找出数据流作业的状态,同时 运行。例如,我想轮询批处理/流式处理作业,一旦完成,我想触发下一个作业。我可以利用 Java/Scala API 来完成此任务吗?我已经尝试使用 com.google.api.services.dataflow.Dataflow$Projects$Locations$Jobs
并且能够检索 JobMetrics,但我正在尝试获取更多信息,例如作业输入和输出指标,以确定流作业是否已处理所有数据。
[已编辑 - 基于后续问题]。
这些是我在 运行:
时尝试从数据流作业中获取的信号
对于批处理作业:仅当作业为 running
、completed
或 failed
时。基于这些,我可以决定是等待、触发下一个作业还是不进行下一个作业分别。
对于 Streaming Jobs:由于我想判断这个 Job 是否处理了所有必要的数据,也许我可以尝试获取以下信息:
1. Data Freshness
2. No. of Elements ingested/ read (Input Metrics)
3. Elements written (Output Metrics)
While checking manually the streaming jobs, we conclude that the job
is "done" when we see that the throughput has come down to zero and
hasn't changed for a few minutes. Would like to do the same while
automating this approach.
有什么可以帮助我的吗?
抱歉,我是 Java 文盲,但关于您需要的详细信息,我可以向您指出 Dataflow API 中的哪个端点来获取它们。我的示例是通过使用 curl 向数据流 API 发送 HTTP 请求来完成的。
作业状态(运行、完成、失败)
- 使用 Jobs endpoint, you can submit a request and it should return the
currentState
. See currentState 查看作业可用的所有状态。这包括 JOB_STATE_DONE、JOB_STATE_RUNNING、JOB_STATE_FAILED 等
- 这可以用于批处理和流处理。
- 在Java中如果我没记错的话你可以使用Job class and method getCurrentState
- 示例:
摄取/写入的元素
- 您提到您已经检索到 JobMetrics。
- JobMetrics endpoint returns lists of MetricStructuredName objects。这些对象包含生成指标的源。您可以在“步数”中找到您的指标。
- 例如,如果您有一个
ReadFromPubSub
步骤,您可以找到它读取的消息数。请参阅下面的屏幕截图,了解对象的外观以及它在 UI. 上的外观
- 您只需要适当地过滤掉数据,就可以看到您需要的详细信息。
- 在Java中应该来自JobMetrics class and method getMetrics
- 示例(
scalar
包含“添加的元素”的值值不一样,因为它是连续流式传输的):
数据新鲜度
- 无法使用数据流提取数据新鲜度 API。
- 数据是使用 Cloud Monitoring 创建的,如果您需要此数据,则需要学习 Monitoring API。请参阅 sample codes 了解如何使用 API。
- 要查看创建的指标,请转至 GCP Console > Monitoring > Dashboards > GCP > Dataflow
- 您应该会看到 Dataflow 作业及其指标的列表。
我希望这些信息可以为您指明正确的方向,并能够使用 Java 来实现它。
我正在尝试以编程方式找出数据流作业的状态,同时 运行。例如,我想轮询批处理/流式处理作业,一旦完成,我想触发下一个作业。我可以利用 Java/Scala API 来完成此任务吗?我已经尝试使用 com.google.api.services.dataflow.Dataflow$Projects$Locations$Jobs
并且能够检索 JobMetrics,但我正在尝试获取更多信息,例如作业输入和输出指标,以确定流作业是否已处理所有数据。
[已编辑 - 基于后续问题]。 这些是我在 运行:
时尝试从数据流作业中获取的信号对于批处理作业:仅当作业为
running
、completed
或failed
时。基于这些,我可以决定是等待、触发下一个作业还是不进行下一个作业分别。对于 Streaming Jobs:由于我想判断这个 Job 是否处理了所有必要的数据,也许我可以尝试获取以下信息:
1. Data Freshness 2. No. of Elements ingested/ read (Input Metrics) 3. Elements written (Output Metrics)
While checking manually the streaming jobs, we conclude that the job is "done" when we see that the throughput has come down to zero and hasn't changed for a few minutes. Would like to do the same while automating this approach.
有什么可以帮助我的吗?
抱歉,我是 Java 文盲,但关于您需要的详细信息,我可以向您指出 Dataflow API 中的哪个端点来获取它们。我的示例是通过使用 curl 向数据流 API 发送 HTTP 请求来完成的。
作业状态(运行、完成、失败)
- 使用 Jobs endpoint, you can submit a request and it should return the
currentState
. See currentState 查看作业可用的所有状态。这包括 JOB_STATE_DONE、JOB_STATE_RUNNING、JOB_STATE_FAILED 等 - 这可以用于批处理和流处理。
- 在Java中如果我没记错的话你可以使用Job class and method getCurrentState
- 示例:
- 使用 Jobs endpoint, you can submit a request and it should return the
摄取/写入的元素
- 您提到您已经检索到 JobMetrics。
- JobMetrics endpoint returns lists of MetricStructuredName objects。这些对象包含生成指标的源。您可以在“步数”中找到您的指标。
- 例如,如果您有一个
ReadFromPubSub
步骤,您可以找到它读取的消息数。请参阅下面的屏幕截图,了解对象的外观以及它在 UI. 上的外观
- 您只需要适当地过滤掉数据,就可以看到您需要的详细信息。
- 在Java中应该来自JobMetrics class and method getMetrics
- 示例(
scalar
包含“添加的元素”的值值不一样,因为它是连续流式传输的):
数据新鲜度
- 无法使用数据流提取数据新鲜度 API。
- 数据是使用 Cloud Monitoring 创建的,如果您需要此数据,则需要学习 Monitoring API。请参阅 sample codes 了解如何使用 API。
- 要查看创建的指标,请转至 GCP Console > Monitoring > Dashboards > GCP > Dataflow
- 您应该会看到 Dataflow 作业及其指标的列表。
我希望这些信息可以为您指明正确的方向,并能够使用 Java 来实现它。