在 GCP Console 上创建数据流作业期间提供参数时出错
Error when providing arguments during Dataflow Job creation on GCP Console
自 2021 年 10 月 05 日/06 日起,我的 GCP 数据流模板文件正在获取模板创建期间提供的参数值(当我 运行 本地计算机中的 .py 文件以创建模板文件时在 GCP 存储上),并且没有获取基于同一模板文件创建作业期间提供的参数。如果我在模板创建期间没有提供任何值,他们会假设一个 RuntimeValueProvider(当不使用 args 的默认值时),但不是在创建作业期间提供的值。
创建作业期间提供的参数存储在 Dataflow 作业会话中。如果我打开作业,转到右侧栏并打开“管道选项”,那么在创建作业期间提供的正确值就在那里,但它们没有达到代码。
我运行在 GCP 控制台中以经典方式从模板中获取我的代码:
gcloud dataflow jobs run JOB_NAME --gcs-location gs://LOCATION/TEMPLATE/FILE --region REGION --project PROJ_NAME --worker-machine-type MACHINE_TYPE --parameters PARAM_1=PARAM_1_VALUE,PARAM_2=PARAM_2_VALUE
我使用的是 SDK 2.32.0,在内部代码中我使用的是“parser.add_value_provider_argument”而不是“parser.add_argument”。但是我使用“parser.add_argument”对其进行了测试,但没有成功。对于这两者,我的代码都假设了我 运行 .py 文件时的参数值。
示例 1
import apache_beam.io.gcp.gcsfilesystem as gcs
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--PARAM_1',
type=str)
parser.add_value_provider_argument('--PARAM_2',
type=str)
beam_options = PipelineOptions()
args = beam_options.view_as(MyOptions)
# Some business operations with args that are always assuming the values provided during template creation
options = {'project': PROJECT,
'runner': 'DataflowRunner',
'region': REGION,
'staging_location': 'gs://{}/temp'.format(BUCKET),
'temp_location': 'gs://{}/temp'.format(BUCKET),
'template_location': 'gs://{}/template/batch_poc'.format(BUCKET)}
pipeline_options = PipelineOptions.from_dictionary(options)
with beam.Pipeline(options = pipeline_options) as p:
lines = (p
| beam...
)
示例 2(与示例 1 相同,但使用默认值)
# ... same as example 1
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--PARAM_1',
default="test1",
type=str)
parser.add_value_provider_argument('--PARAM_2',
default="test2",
type=str)
# ... same as example 1
在所有情况下,我在创建作业期间提供的参数都将被忽略。
案例一:
当 运行ning 示例 1 在本地机器上没有 args(如下面的 python 命令)并且在 GCP 控制台上 运行ning 其模板时,两种情况:args 和没有 args(如下面的第二个命令). PARAM_1_VALUE 和 PARAM_2_VALUE 中的值相同:RuntimeValueProvider(...)
LOCALHOST> python3 code.py
GCP> gcloud dataflow jobs run ...
OR
GCP> gcloud dataflow jobs run ... --parameters PARAM_1=another_test_1,PARAM_2=another_test_2
案例二:
当 运行ning 示例 1 在本地计算机上使用 args(如下面的 python 命令)并且在 GCP 控制台上 运行ning 其模板时,两种情况:args 和无 args(如下面的第二个命令) . PARAM_1_VALUE 和 PARAM_2_VALUE 中的值与模板创建期间传递的值相同:another_test_{value} 而不是 another_another_test_{value}
LOCALHOST> python3 code.py --PARAM_1 another_test_1 --PARAM_2 another_test_2
GCP> gcloud dataflow jobs run ...
OR
GCP> gcloud dataflow jobs run ... --parameters PARAM_1=another_another_test_1,PARAM_2=another_another_test_2
案例三:
当 运行ning 示例 2 在本地机器上没有 args(如下面的 python 命令)并且在 GCP 控制台上 运行ning 其模板时,两种情况:args 和没有 args(如下面的第二个命令). PARAM_1_VALUE和PARAM_2_VALUE中的值为默认值。
LOCALHOST> python3 code.py
GCP> gcloud dataflow jobs run ...
OR
GCP> gcloud dataflow jobs run ... --parameters PARAM_1=another_test_1,PARAM_2=another_test_2
案例四:
当 运行ning 示例 2 在本地计算机上使用 args(如下面的 python 命令)并在 GCP 控制台上 运行ning 其模板时,两种情况:args 和无 args(如下面的第二个命令) .它的发生与情况 2 相同。
注意:我更新了两个库:apache-beam 和 apache-beam[gcp]
请注意,“--PARAM_1_VALUE”、“--PARAM_1_VALUE”...值不能在管道构造期间使用。根据 1:
“RuntimeValueProvider 是默认的 ValueProvider 类型。 RuntimeValueProvider 允许您的管道接受仅在管道执行期间可用的值。该值在管道构建期间不可用,因此您不能使用该值更改管道的工作流图。”
文档显示在 ValueProvider 参数上使用 .get() 方法允许您在运行时检索值并在您的函数中使用它。直译:
“要在您自己的函数中使用运行时参数值,请更新函数以使用 ValueProvider 参数。”
此处,ValueProvider.get() 在运行时方法 DoFn.process() 内部调用。
基于此,我建议您按照 2 更改代码并重试。
自 2021 年 10 月 05 日/06 日起,我的 GCP 数据流模板文件正在获取模板创建期间提供的参数值(当我 运行 本地计算机中的 .py 文件以创建模板文件时在 GCP 存储上),并且没有获取基于同一模板文件创建作业期间提供的参数。如果我在模板创建期间没有提供任何值,他们会假设一个 RuntimeValueProvider(当不使用 args 的默认值时),但不是在创建作业期间提供的值。
创建作业期间提供的参数存储在 Dataflow 作业会话中。如果我打开作业,转到右侧栏并打开“管道选项”,那么在创建作业期间提供的正确值就在那里,但它们没有达到代码。
我运行在 GCP 控制台中以经典方式从模板中获取我的代码:
gcloud dataflow jobs run JOB_NAME --gcs-location gs://LOCATION/TEMPLATE/FILE --region REGION --project PROJ_NAME --worker-machine-type MACHINE_TYPE --parameters PARAM_1=PARAM_1_VALUE,PARAM_2=PARAM_2_VALUE
我使用的是 SDK 2.32.0,在内部代码中我使用的是“parser.add_value_provider_argument”而不是“parser.add_argument”。但是我使用“parser.add_argument”对其进行了测试,但没有成功。对于这两者,我的代码都假设了我 运行 .py 文件时的参数值。
示例 1
import apache_beam.io.gcp.gcsfilesystem as gcs
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--PARAM_1',
type=str)
parser.add_value_provider_argument('--PARAM_2',
type=str)
beam_options = PipelineOptions()
args = beam_options.view_as(MyOptions)
# Some business operations with args that are always assuming the values provided during template creation
options = {'project': PROJECT,
'runner': 'DataflowRunner',
'region': REGION,
'staging_location': 'gs://{}/temp'.format(BUCKET),
'temp_location': 'gs://{}/temp'.format(BUCKET),
'template_location': 'gs://{}/template/batch_poc'.format(BUCKET)}
pipeline_options = PipelineOptions.from_dictionary(options)
with beam.Pipeline(options = pipeline_options) as p:
lines = (p
| beam...
)
示例 2(与示例 1 相同,但使用默认值)
# ... same as example 1
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--PARAM_1',
default="test1",
type=str)
parser.add_value_provider_argument('--PARAM_2',
default="test2",
type=str)
# ... same as example 1
在所有情况下,我在创建作业期间提供的参数都将被忽略。
案例一: 当 运行ning 示例 1 在本地机器上没有 args(如下面的 python 命令)并且在 GCP 控制台上 运行ning 其模板时,两种情况:args 和没有 args(如下面的第二个命令). PARAM_1_VALUE 和 PARAM_2_VALUE 中的值相同:RuntimeValueProvider(...)
LOCALHOST> python3 code.py
GCP> gcloud dataflow jobs run ...
OR
GCP> gcloud dataflow jobs run ... --parameters PARAM_1=another_test_1,PARAM_2=another_test_2
案例二: 当 运行ning 示例 1 在本地计算机上使用 args(如下面的 python 命令)并且在 GCP 控制台上 运行ning 其模板时,两种情况:args 和无 args(如下面的第二个命令) . PARAM_1_VALUE 和 PARAM_2_VALUE 中的值与模板创建期间传递的值相同:another_test_{value} 而不是 another_another_test_{value}
LOCALHOST> python3 code.py --PARAM_1 another_test_1 --PARAM_2 another_test_2
GCP> gcloud dataflow jobs run ...
OR
GCP> gcloud dataflow jobs run ... --parameters PARAM_1=another_another_test_1,PARAM_2=another_another_test_2
案例三: 当 运行ning 示例 2 在本地机器上没有 args(如下面的 python 命令)并且在 GCP 控制台上 运行ning 其模板时,两种情况:args 和没有 args(如下面的第二个命令). PARAM_1_VALUE和PARAM_2_VALUE中的值为默认值。
LOCALHOST> python3 code.py
GCP> gcloud dataflow jobs run ...
OR
GCP> gcloud dataflow jobs run ... --parameters PARAM_1=another_test_1,PARAM_2=another_test_2
案例四: 当 运行ning 示例 2 在本地计算机上使用 args(如下面的 python 命令)并在 GCP 控制台上 运行ning 其模板时,两种情况:args 和无 args(如下面的第二个命令) .它的发生与情况 2 相同。
注意:我更新了两个库:apache-beam 和 apache-beam[gcp]
请注意,“--PARAM_1_VALUE”、“--PARAM_1_VALUE”...值不能在管道构造期间使用。根据 1:
“RuntimeValueProvider 是默认的 ValueProvider 类型。 RuntimeValueProvider 允许您的管道接受仅在管道执行期间可用的值。该值在管道构建期间不可用,因此您不能使用该值更改管道的工作流图。”
文档显示在 ValueProvider 参数上使用 .get() 方法允许您在运行时检索值并在您的函数中使用它。直译:
“要在您自己的函数中使用运行时参数值,请更新函数以使用 ValueProvider 参数。”
此处,ValueProvider.get() 在运行时方法 DoFn.process() 内部调用。
基于此,我建议您按照 2 更改代码并重试。