Sagemaker 管道中的 SparkJarProcessor

SparkJarProcessor in Sagemaker Pipeline

我想要 运行 Sagemaker Pipeline 中的 SparkJarProcessor。在我创建 SparkJarProcessor 的实例之后,当我只是 run 处理器时,我可以使用 submit_appsubmit_class 参数指定我想要执行的 jar 和 class run 方法。例如,

processor.run(
    submit_app="my.jar",
    submit_class="program.to.run",
    arguments=['--my_arg', "my_arg"],
    configuration=my_config,
    spark_event_logs_s3_uri=log_path
)

如果我想 运行 它作为管道中的一个步骤,我可以给 ProcessingStep 什么参数?根据 this documentation,您可以在处理器上调用 get_run_args 以“ 获取在 ProcessingStep 中使用 SparkJarProcessor 时所需的规范化输入、输出和参数”,但是当我 运行 是这样的时候,

processor.get_run_args(
    submit_app="my.jar", 
    submit_class="program.to.run",
    arguments=['--my_arg', "my_arg"],
    configuration=my_config,
    spark_event_logs_s3_uri=log_path
)

我的输出如下所示:

RunArgs(code='my.jar', inputs=[<sagemaker.processing.ProcessingInput object at 0x7fc53284a090>], outputs=[<sagemaker.processing.ProcessingOutput object at 0x7fc532845ed0>], arguments=['--my_arg', 'my_arg'])

"program.to.run" 不是输出的一部分。所以,假设code是指定jar,那么submit_class的规范化版本是什么?

当在 SparkJarProcessor 上调用 get_run_argsrun 时,submit_class is used to set a property on the processor itself 这就是为什么您在 get_run_args 中看不到它的原因输出。

处理器 属性 将在流水线定义生成期间用于将 ContainerEntrypoint 参数设置为 CreateProcessingJob

示例:

run_args = spark_processor.get_run_args(
    submit_app="my.jar",
    submit_class="program.to.run",
    arguments=[]
)

step_process = ProcessingStep(
    name="SparkJarProcessStep",
    processor=spark_processor,
    inputs=run_args.inputs,
    outputs=run_args.outputs,
    code=run_args.code
)

pipeline = Pipeline(
    name="myPipeline",
    parameters=[],
    steps=[step_process],
)

definition = json.loads(pipeline.definition())
definition

definition的输出:

...
'Steps': [{'Name': 'SparkJarProcessStep',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': 2,
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '153931337802.dkr.ecr.us-west-2.amazonaws.com/sagemaker-spark-processing:2.4-cpu',
     'ContainerEntrypoint': ['smspark-submit',
      '--class',
      'program.to.run',
      '--local-spark-event-logs-dir',
      '/opt/ml/processing/spark-events/',
      '/opt/ml/processing/input/code/my.jar']},
...