我能否使 Flex 模板作业在开始处理数据之前花费不到 10 分钟的时间?
Can I make flex template jobs take less than 10 minutes before they start to process data?
我正在使用 Terraform 资源 google_dataflow_flex_template_job 来部署 Dataflow Flex 模板作业。
resource "google_dataflow_flex_template_job" "streaming_beam" {
provider = google-beta
name = "streaming-beam"
container_spec_gcs_path = module.streaming_beam_flex_template_file[0].fully_qualified_path
parameters = {
"input_subscription" = google_pubsub_subscription.ratings[0].id
"output_table" = "${var.project}:beam_samples.streaming_beam_sql"
"service_account_email" = data.terraform_remote_state.state.outputs.sa.email
"network" = google_compute_network.network.name
"subnetwork" = "regions/${google_compute_subnetwork.subnet.region}/subnetworks/${google_compute_subnetwork.subnet.name}"
}
}
一切正常,但在我没有请求的情况下,该工作似乎正在使用 flexible resource scheduling (flexRS) mode,我之所以这样说是因为该工作需要大约十分钟才能开始,并且在此期间状态 = QUEUED 我认为是仅适用于 flexRS 作业。
使用 flexRS 模式适用于生产场景,但我目前仍在开发我的数据流作业,这样做时 flexRS 非常不方便,因为它需要大约 10 分钟才能看到我可能做出的任何更改的效果,无论多小啊
在Enabling FlexRS中说明
To enable a FlexRS job, use the following pipeline option:
--flexRSGoal=COST_OPTIMIZED, where the cost-optimized goal means that the Dataflow service chooses any available discounted resources or
--flexRSGoal=SPEED_OPTIMIZED, where it optimizes for lower execution time.
然后我发现了以下语句:
To turn on FlexRS, you must specify the value COST_OPTIMIZED to allow the Dataflow service to choose any available discounted resources.
在 Specifying pipeline execution parameters > Setting other Cloud Dataflow pipeline options
我将其解释为 flexrs_goal=SPEED_OPTIMIZED 将 关闭 flexRS 模式。但是,我将 google_dataflow_flex_template_job 资源的定义更改为:
resource "google_dataflow_flex_template_job" "streaming_beam" {
provider = google-beta
name = "streaming-beam"
container_spec_gcs_path = module.streaming_beam_flex_template_file[0].fully_qualified_path
parameters = {
"input_subscription" = google_pubsub_subscription.ratings[0].id
"output_table" = "${var.project}:beam_samples.streaming_beam_sql"
"service_account_email" = data.terraform_remote_state.state.outputs.sa.email
"network" = google_compute_network.network.name
"subnetwork" = "regions/${google_compute_subnetwork.subnet.region}/subnetworks/${google_compute_subnetwork.subnet.name}"
"flexrs_goal" = "SPEED_OPTIMIZED"
}
}
(注意加了"flexrs_goal" = "SPEED_OPTIMIZED"
)不过好像没什么区别。数据流 UI 确认我已设置 SPEED_OPTIMIZED:
但作业开始处理数据的时间仍然太长(9 分 46 秒),而且它一直处于 state=QUEUED 状态:
2021-01-17 19:49:19.021 GMTStarting GCE instance, launcher-2021011711491611239867327455334861, to launch the template.
...
...
2021-01-17 19:59:05.381 GMTStarting 1 workers in europe-west1-d...
2021-01-17 19:59:12.256 GMTVM, launcher-2021011711491611239867327455334861, stopped.
然后我尝试明确设置 flexrs_goal=COST_OPTIMIZED 只是为了看看它是否有任何不同,但这只会导致错误:
"The workflow could not be created. Causes: The workflow could not be
created due to misconfiguration. The experimental feature
flexible_resource_scheduling is not supported for streaming jobs.
Contact Google Cloud Support for further help. "
这是有道理的。我的工作确实是流媒体工作,文档确实说明 flexRS 仅适用于批处理作业。
This page explains how to enable Flexible Resource Scheduling (FlexRS) for autoscaled batch pipelines in Dataflow.
https://cloud.google.com/dataflow/docs/guides/flexrs
但这并没有解决我的问题。正如我上面所说,如果我使用 flexrs_goal=SPEED_OPTIMIZED 进行部署,那么 state=QUEUED 将近十分钟,但据我所知 QUEUED 仅适用于 flexRS 作业:
Therefore, after you submit a FlexRS job, your job displays an ID and a Status of Queued
https://cloud.google.com/dataflow/docs/guides/flexrs#delayed_scheduling
所以我很困惑:
- 为什么我的作业要排队,即使它不是 flexRS 作业?
- 为什么我的作业要花将近十分钟才能开始处理任何数据?
- 如何加快我的工作开始处理数据所需的时间,以便我可以在 development/testing 期间获得更快的反馈?
更新,我深入挖掘日志以了解在那 9 分 46 秒内发生了什么。这两个连续的日志消息相隔 7 分 23 秒:
2021-01-17 19:51:03.381 GMT
"INFO:apache_beam.runners.portability.stager:Executing command: ['/usr/local/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', '/dataflow/template/requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']"
2021-01-17 19:58:26.459 GMT
"INFO:apache_beam.runners.portability.stager:Downloading source distribution of the SDK from PyPi"
无论这两个日志记录之间发生了什么,都是导致长时间处于 state=QUEUED 状态的主要原因。有人知道可能是什么原因吗?
如 中所述,您需要在 requirements.txt:
中提取 apache-beam
模块
RUN pip install -U apache-beam==<version>
RUN pip install -U -r ./requirements.txt
在开发过程中,我更喜欢使用 DirectRunner,以获得最快的反馈。
我正在使用 Terraform 资源 google_dataflow_flex_template_job 来部署 Dataflow Flex 模板作业。
resource "google_dataflow_flex_template_job" "streaming_beam" {
provider = google-beta
name = "streaming-beam"
container_spec_gcs_path = module.streaming_beam_flex_template_file[0].fully_qualified_path
parameters = {
"input_subscription" = google_pubsub_subscription.ratings[0].id
"output_table" = "${var.project}:beam_samples.streaming_beam_sql"
"service_account_email" = data.terraform_remote_state.state.outputs.sa.email
"network" = google_compute_network.network.name
"subnetwork" = "regions/${google_compute_subnetwork.subnet.region}/subnetworks/${google_compute_subnetwork.subnet.name}"
}
}
一切正常,但在我没有请求的情况下,该工作似乎正在使用 flexible resource scheduling (flexRS) mode,我之所以这样说是因为该工作需要大约十分钟才能开始,并且在此期间状态 = QUEUED 我认为是仅适用于 flexRS 作业。
使用 flexRS 模式适用于生产场景,但我目前仍在开发我的数据流作业,这样做时 flexRS 非常不方便,因为它需要大约 10 分钟才能看到我可能做出的任何更改的效果,无论多小啊
在Enabling FlexRS中说明
To enable a FlexRS job, use the following pipeline option: --flexRSGoal=COST_OPTIMIZED, where the cost-optimized goal means that the Dataflow service chooses any available discounted resources or --flexRSGoal=SPEED_OPTIMIZED, where it optimizes for lower execution time.
然后我发现了以下语句:
To turn on FlexRS, you must specify the value COST_OPTIMIZED to allow the Dataflow service to choose any available discounted resources.
在 Specifying pipeline execution parameters > Setting other Cloud Dataflow pipeline options
我将其解释为 flexrs_goal=SPEED_OPTIMIZED 将 关闭 flexRS 模式。但是,我将 google_dataflow_flex_template_job 资源的定义更改为:
resource "google_dataflow_flex_template_job" "streaming_beam" {
provider = google-beta
name = "streaming-beam"
container_spec_gcs_path = module.streaming_beam_flex_template_file[0].fully_qualified_path
parameters = {
"input_subscription" = google_pubsub_subscription.ratings[0].id
"output_table" = "${var.project}:beam_samples.streaming_beam_sql"
"service_account_email" = data.terraform_remote_state.state.outputs.sa.email
"network" = google_compute_network.network.name
"subnetwork" = "regions/${google_compute_subnetwork.subnet.region}/subnetworks/${google_compute_subnetwork.subnet.name}"
"flexrs_goal" = "SPEED_OPTIMIZED"
}
}
(注意加了"flexrs_goal" = "SPEED_OPTIMIZED"
)不过好像没什么区别。数据流 UI 确认我已设置 SPEED_OPTIMIZED:
但作业开始处理数据的时间仍然太长(9 分 46 秒),而且它一直处于 state=QUEUED 状态:
2021-01-17 19:49:19.021 GMTStarting GCE instance, launcher-2021011711491611239867327455334861, to launch the template.
...
...
2021-01-17 19:59:05.381 GMTStarting 1 workers in europe-west1-d...
2021-01-17 19:59:12.256 GMTVM, launcher-2021011711491611239867327455334861, stopped.
然后我尝试明确设置 flexrs_goal=COST_OPTIMIZED 只是为了看看它是否有任何不同,但这只会导致错误:
"The workflow could not be created. Causes: The workflow could not be created due to misconfiguration. The experimental feature flexible_resource_scheduling is not supported for streaming jobs. Contact Google Cloud Support for further help. "
这是有道理的。我的工作确实是流媒体工作,文档确实说明 flexRS 仅适用于批处理作业。
This page explains how to enable Flexible Resource Scheduling (FlexRS) for autoscaled batch pipelines in Dataflow.
https://cloud.google.com/dataflow/docs/guides/flexrs
但这并没有解决我的问题。正如我上面所说,如果我使用 flexrs_goal=SPEED_OPTIMIZED 进行部署,那么 state=QUEUED 将近十分钟,但据我所知 QUEUED 仅适用于 flexRS 作业:
Therefore, after you submit a FlexRS job, your job displays an ID and a Status of Queued
https://cloud.google.com/dataflow/docs/guides/flexrs#delayed_scheduling
所以我很困惑:
- 为什么我的作业要排队,即使它不是 flexRS 作业?
- 为什么我的作业要花将近十分钟才能开始处理任何数据?
- 如何加快我的工作开始处理数据所需的时间,以便我可以在 development/testing 期间获得更快的反馈?
更新,我深入挖掘日志以了解在那 9 分 46 秒内发生了什么。这两个连续的日志消息相隔 7 分 23 秒:
2021-01-17 19:51:03.381 GMT "INFO:apache_beam.runners.portability.stager:Executing command: ['/usr/local/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', '/dataflow/template/requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']"
2021-01-17 19:58:26.459 GMT "INFO:apache_beam.runners.portability.stager:Downloading source distribution of the SDK from PyPi"
无论这两个日志记录之间发生了什么,都是导致长时间处于 state=QUEUED 状态的主要原因。有人知道可能是什么原因吗?
如
apache-beam
模块
RUN pip install -U apache-beam==<version>
RUN pip install -U -r ./requirements.txt
在开发过程中,我更喜欢使用 DirectRunner,以获得最快的反馈。