Sagemaker 管道中的 SparkJarProcessor
SparkJarProcessor in Sagemaker Pipeline
我想要 运行 Sagemaker Pipeline 中的 SparkJarProcessor。在我创建 SparkJarProcessor 的实例之后,当我只是 run
处理器时,我可以使用 submit_app
和 submit_class
参数指定我想要执行的 jar 和 class run
方法。例如,
processor.run(
submit_app="my.jar",
submit_class="program.to.run",
arguments=['--my_arg', "my_arg"],
configuration=my_config,
spark_event_logs_s3_uri=log_path
)
如果我想 运行 它作为管道中的一个步骤,我可以给 ProcessingStep 什么参数?根据 this documentation,您可以在处理器上调用 get_run_args 以“ 获取在 ProcessingStep 中使用 SparkJarProcessor 时所需的规范化输入、输出和参数”,但是当我 运行 是这样的时候,
processor.get_run_args(
submit_app="my.jar",
submit_class="program.to.run",
arguments=['--my_arg', "my_arg"],
configuration=my_config,
spark_event_logs_s3_uri=log_path
)
我的输出如下所示:
RunArgs(code='my.jar', inputs=[<sagemaker.processing.ProcessingInput object at 0x7fc53284a090>], outputs=[<sagemaker.processing.ProcessingOutput object at 0x7fc532845ed0>], arguments=['--my_arg', 'my_arg'])
"program.to.run" 不是输出的一部分。所以,假设code
是指定jar,那么submit_class
的规范化版本是什么?
当在 SparkJarProcessor 上调用 get_run_args
或 run
时,submit_class
is used to set a property on the processor itself 这就是为什么您在 get_run_args
中看不到它的原因输出。
处理器 属性 将在流水线定义生成期间用于将 ContainerEntrypoint 参数设置为 CreateProcessingJob
。
示例:
run_args = spark_processor.get_run_args(
submit_app="my.jar",
submit_class="program.to.run",
arguments=[]
)
step_process = ProcessingStep(
name="SparkJarProcessStep",
processor=spark_processor,
inputs=run_args.inputs,
outputs=run_args.outputs,
code=run_args.code
)
pipeline = Pipeline(
name="myPipeline",
parameters=[],
steps=[step_process],
)
definition = json.loads(pipeline.definition())
definition
definition
的输出:
...
'Steps': [{'Name': 'SparkJarProcessStep',
'Type': 'Processing',
'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
'InstanceCount': 2,
'VolumeSizeInGB': 30}},
'AppSpecification': {'ImageUri': '153931337802.dkr.ecr.us-west-2.amazonaws.com/sagemaker-spark-processing:2.4-cpu',
'ContainerEntrypoint': ['smspark-submit',
'--class',
'program.to.run',
'--local-spark-event-logs-dir',
'/opt/ml/processing/spark-events/',
'/opt/ml/processing/input/code/my.jar']},
...
我想要 运行 Sagemaker Pipeline 中的 SparkJarProcessor。在我创建 SparkJarProcessor 的实例之后,当我只是 run
处理器时,我可以使用 submit_app
和 submit_class
参数指定我想要执行的 jar 和 class run
方法。例如,
processor.run(
submit_app="my.jar",
submit_class="program.to.run",
arguments=['--my_arg', "my_arg"],
configuration=my_config,
spark_event_logs_s3_uri=log_path
)
如果我想 运行 它作为管道中的一个步骤,我可以给 ProcessingStep 什么参数?根据 this documentation,您可以在处理器上调用 get_run_args 以“ 获取在 ProcessingStep 中使用 SparkJarProcessor 时所需的规范化输入、输出和参数”,但是当我 运行 是这样的时候,
processor.get_run_args(
submit_app="my.jar",
submit_class="program.to.run",
arguments=['--my_arg', "my_arg"],
configuration=my_config,
spark_event_logs_s3_uri=log_path
)
我的输出如下所示:
RunArgs(code='my.jar', inputs=[<sagemaker.processing.ProcessingInput object at 0x7fc53284a090>], outputs=[<sagemaker.processing.ProcessingOutput object at 0x7fc532845ed0>], arguments=['--my_arg', 'my_arg'])
"program.to.run" 不是输出的一部分。所以,假设code
是指定jar,那么submit_class
的规范化版本是什么?
当在 SparkJarProcessor 上调用 get_run_args
或 run
时,submit_class
is used to set a property on the processor itself 这就是为什么您在 get_run_args
中看不到它的原因输出。
处理器 属性 将在流水线定义生成期间用于将 ContainerEntrypoint 参数设置为 CreateProcessingJob
。
示例:
run_args = spark_processor.get_run_args(
submit_app="my.jar",
submit_class="program.to.run",
arguments=[]
)
step_process = ProcessingStep(
name="SparkJarProcessStep",
processor=spark_processor,
inputs=run_args.inputs,
outputs=run_args.outputs,
code=run_args.code
)
pipeline = Pipeline(
name="myPipeline",
parameters=[],
steps=[step_process],
)
definition = json.loads(pipeline.definition())
definition
definition
的输出:
...
'Steps': [{'Name': 'SparkJarProcessStep',
'Type': 'Processing',
'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
'InstanceCount': 2,
'VolumeSizeInGB': 30}},
'AppSpecification': {'ImageUri': '153931337802.dkr.ecr.us-west-2.amazonaws.com/sagemaker-spark-processing:2.4-cpu',
'ContainerEntrypoint': ['smspark-submit',
'--class',
'program.to.run',
'--local-spark-event-logs-dir',
'/opt/ml/processing/spark-events/',
'/opt/ml/processing/input/code/my.jar']},
...