我能否使 Flex 模板作业在开始处理数据之前花费不到 10 分钟的时间?

Can I make flex template jobs take less than 10 minutes before they start to process data?

我正在使用 Terraform 资源 google_dataflow_flex_template_job 来部署 Dataflow Flex 模板作业。

resource "google_dataflow_flex_template_job" "streaming_beam" {
  provider                = google-beta
  name                    = "streaming-beam"
  container_spec_gcs_path = module.streaming_beam_flex_template_file[0].fully_qualified_path
  parameters = {
    "input_subscription"    = google_pubsub_subscription.ratings[0].id
    "output_table"          = "${var.project}:beam_samples.streaming_beam_sql"
    "service_account_email" = data.terraform_remote_state.state.outputs.sa.email
    "network"               = google_compute_network.network.name
    "subnetwork"            = "regions/${google_compute_subnetwork.subnet.region}/subnetworks/${google_compute_subnetwork.subnet.name}"
  }
}

一切正常,但在我没有请求的情况下,该工作似乎正在使用 flexible resource scheduling (flexRS) mode,我之所以这样说是因为该工作需要大约十分钟才能开始,并且在此期间状态 = QUEUED 我认为是仅适用于 flexRS 作业。

使用 flexRS 模式适用于生产场景,但我目前仍在开发我的数据流作业,这样做时 flexRS 非常不方便,因为它需要大约 10 分钟才能看到我可能做出的任何更改的效果,无论多小啊

Enabling FlexRS中说明

To enable a FlexRS job, use the following pipeline option: --flexRSGoal=COST_OPTIMIZED, where the cost-optimized goal means that the Dataflow service chooses any available discounted resources or --flexRSGoal=SPEED_OPTIMIZED, where it optimizes for lower execution time.

然后我发现了以下语句:

To turn on FlexRS, you must specify the value COST_OPTIMIZED to allow the Dataflow service to choose any available discounted resources.

Specifying pipeline execution parameters > Setting other Cloud Dataflow pipeline options

我将其解释为 flexrs_goal=SPEED_OPTIMIZED 将 关闭 flexRS 模式。但是,我将 google_dataflow_flex_template_job 资源的定义更改为:

resource "google_dataflow_flex_template_job" "streaming_beam" {
  provider                = google-beta
  name                    = "streaming-beam"
  container_spec_gcs_path = module.streaming_beam_flex_template_file[0].fully_qualified_path
  parameters = {
    "input_subscription"    = google_pubsub_subscription.ratings[0].id
    "output_table"          = "${var.project}:beam_samples.streaming_beam_sql"
    "service_account_email" = data.terraform_remote_state.state.outputs.sa.email
    "network"               = google_compute_network.network.name
    "subnetwork"            = "regions/${google_compute_subnetwork.subnet.region}/subnetworks/${google_compute_subnetwork.subnet.name}"
    "flexrs_goal"           = "SPEED_OPTIMIZED" 
  }
}

(注意加了"flexrs_goal" = "SPEED_OPTIMIZED")不过好像没什么区别。数据流 UI 确认我已设置 SPEED_OPTIMIZED:

但作业开始处理数据的时间仍然太长(9 分 46 秒),而且它一直处于 state=QUEUED 状态:

2021-01-17 19:49:19.021 GMTStarting GCE instance, launcher-2021011711491611239867327455334861, to launch the template.
...
...
2021-01-17 19:59:05.381 GMTStarting 1 workers in europe-west1-d...
2021-01-17 19:59:12.256 GMTVM, launcher-2021011711491611239867327455334861, stopped.

然后我尝试明确设置 flexrs_goal=COST_OPTIMIZED 只是为了看看它是否有任何不同,但这只会导致错误:

"The workflow could not be created. Causes: The workflow could not be created due to misconfiguration. The experimental feature flexible_resource_scheduling is not supported for streaming jobs. Contact Google Cloud Support for further help. "

这是有道理的。我的工作确实是流媒体工作,文档确实说明 flexRS 仅适用于批处理作业。

This page explains how to enable Flexible Resource Scheduling (FlexRS) for autoscaled batch pipelines in Dataflow.

https://cloud.google.com/dataflow/docs/guides/flexrs

但这并没有解决我的问题。正如我上面所说,如果我使用 flexrs_goal=SPEED_OPTIMIZED 进行部署,那么 state=QUEUED 将近十分钟,但据我所知 QUEUED 仅适用于 flexRS 作业:

Therefore, after you submit a FlexRS job, your job displays an ID and a Status of Queued

https://cloud.google.com/dataflow/docs/guides/flexrs#delayed_scheduling

所以我很困惑:

  1. 为什么我的作业要排队,即使它不是 flexRS 作业?
  2. 为什么我的作业要花将近十分钟才能开始处理任何数据?
  3. 如何加快我的工作开始处理数据所需的时间,以便我可以在 development/testing 期间获得更快的反馈?

更新,我深入挖掘日志以了解在那 9 分 46 秒内发生了什么。这两个连续的日志消息相隔 7 分 23 秒:

2021-01-17 19:51:03.381 GMT "INFO:apache_beam.runners.portability.stager:Executing command: ['/usr/local/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', '/dataflow/template/requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']"
2021-01-17 19:58:26.459 GMT "INFO:apache_beam.runners.portability.stager:Downloading source distribution of the SDK from PyPi"

无论这两个日志记录之间发生了什么,都是导致长时间处于 state=QUEUED 状态的主要原因。有人知道可能是什么原因吗?

中所述,您需要在 requirements.txt:

中提取 apache-beam 模块
RUN pip install -U apache-beam==<version>
RUN pip install -U -r ./requirements.txt

在开发过程中,我更喜欢使用 DirectRunner,以获得最快的反馈。