如何使用 API 在 GCP 数据流中检索当前工作人员计数

How to retrieve current workers count for job in GCP dataflow using API

有谁知道是否有可能让当前工作人员计算 GCP 数据流中 运行 的活跃工作?

我无法使用 google API 提供的方法来完成。

我能得到的一件事是CurrentVcpuCount,但这不是我需要的。

提前致谢!

Dataflow 作业中的当前工作人员数量显示在 autoscaling 下的消息日志中。例如,我做了一个快速的工作,当在我的云 Shell:

中显示作业日志时,我收到了以下消息
INFO:root:2019-01-28T16:42:33.173Z: JOB_MESSAGE_DETAILED: Autoscaling: Raised the number of workers to 0 based on the rate of progress in the currently running step(s).
INFO:root:2019-01-28T16:43:02.166Z: JOB_MESSAGE_DETAILED: Autoscaling: Raised the number of workers to 1 based on the rate of progress in the currently running step(s).
INFO:root:2019-01-28T16:43:05.385Z: JOB_MESSAGE_DETAILED: Workers have started successfully.
INFO:root:2019-01-28T16:43:05.433Z: JOB_MESSAGE_DETAILED: Workers have started successfully.

现在,您可以使用 projects.jobs.messages.list 方法在数据流 API 中查询这些消息,并将 minimumImportance 参数设置为 JOB_MESSAGE_BASIC

您将收到类似于以下内容的回复:

...
"autoscalingEvents": [
    {...} //other events
    {

      "currentNumWorkers": "1",
      "eventType": "CURRENT_NUM_WORKERS_CHANGED",
      "description": {
          "messageText": "(fcfef6769cff802b): Worker pool started.",
          "messageKey": "POOL_STARTUP_COMPLETED"
      },
      "time": "2019-01-28T16:43:02.130129051Z",
      "workerPool": "Regular"
    },

要扩展它,您可以创建一个 python 脚本来解析响应,并且只从列表 autoscalingEvents 的最后一个元素中获取参数 currentNumWorkers,以了解什么是作业中的最后(因此是当前)工人数。

请注意,如果没有此参数,则表示工人数为零。

编辑:

我做了一个快速的 python 脚本,它使用我上面提到的 API 从消息日志中检索当前的工人数量:

from google.oauth2 import service_account
import googleapiclient.discovery


credentials = service_account.Credentials.from_service_account_file(
    filename='PATH-TO-SERVICE-ACCOUNT-KEY/key.json',
    scopes=['https://www.googleapis.com/auth/cloud-platform'])
service = googleapiclient.discovery.build(
            'dataflow', 'v1b3', credentials=credentials)




project_id="MY-PROJECT-ID"
job_id="DATAFLOW-JOB-ID"

messages=service.projects().jobs().messages().list(
            projectId=project_id,
            jobId=job_id
        ).execute()

try:
    print("Current number of workers is "+messages['autoscalingEvents'][-1]['currentNumWorkers'])
except:
    print("Current number of workers is 0")

一些注意事项:

  • 范围是您引用的服务帐户密钥所需的权限(在 from_service_account_file 函数中),以便调用 API。需要此行才能向 API 进行身份验证。您可以使用 this list 中的任何一个,为了方便我,我只使用了具有 project/owner 权限的服务帐户密钥。

  • 如果您想了解有关 Python API 客户端库的更多信息,请查看 this documentation, and this samples.

<script async src="//pagead2.googlesyndication.com/pagead/js/adsbygoogle.js"></script>
<script>
     (adsbygoogle = window.adsbygoogle || []).push({
          google_ad_client: "ca-pub-5513132861824326",
          enable_page_level_ads: true
     });
</script>