如何将 stepfunction executionId 解析为 SageMaker 批量转换作业名称?

How to parse stepfunction executionId to SageMaker batch transform job name?

我创建了一个 stepfunction,下面这个状态机的定义 (step-function.json) 在 terraform 中使用(使用本页中的语法:https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateTransformJob.html

如果我第一次执行这个状态机,它会创建一个名为 example-jobname 的 SageMaker 批量转换作业,但我需要每天执行这个状态机,然后它会给我错误 "error": "SageMaker.ResourceInUseException", "cause": "Job name must be unique within an AWS account and region, and a job with this name already exists

原因是因为作业名称被硬编码为example-jobname所以如果状态机在第一次执行后执行,由于作业名称需要唯一,任务会失败,只是想知道我如何添加一个字符串(类似于作业名称末尾的 ExecutionId)。这是我尝试过的:

  1. 我在 json 文件的 Parameters 部分添加了 "executionId.$": "States.Format('somestring {}', $$.Execution.Id)",但是当我执行任务时出现错误 "error": "States.Runtime", "cause": "An error occurred while executing the state 'SageMaker CreateTransformJob' (entered at the event id #2). The Parameters '{\"BatchStrategy\":\"SingleRecord\",..............\"executionId\":\"somestring arn:aws:states:us-east-1:xxxxx:execution:xxxxx-state-machine:xxxxxxxx72950\"}' could not be used to start the Task: [The field \"executionId\" is not supported by Step Functions]"}

  2. 我修改了json文件中的jobname为"TransformJobName": "example-jobname-States.Format('somestring {}', $$.Execution.Id)",,当我执行statemachine时,报错:"error": "SageMaker.AmazonSageMakerException", "cause": "2 validation errors detected: Value 'example-jobname-States.Format('somestring {}', $$.Execution.Id)' at 'transformJobName' failed to satisfy constraint: Member must satisfy regular expression pattern: ^[a-zA-Z0-9](-*[a-zA-Z0-9]){0,62}; Value 'example-jobname-States.Format('somestring {}', $$.Execution.Id)' at 'transformJobName' failed to satisfy constraint: Member must have length less than or equal to 63

我真的运行没主意了,有人可以帮忙吗?非常感谢。

因此,根据 documentation,我们应该按以下格式传递参数

        "Parameters": {
            "ModelName.$": "$$.Execution.Name",  
            ....
        },

如果你仔细看看你的定义中缺少的东西,所以你的步进函数定义应该如下所示:

任一

      "TransformJobName.$": "$$.Execution.Id",

      "TransformJobName.$: "States.Format('mytransformjob{}', $$.Execution.Id)"

完整的状态机定义:

    {
        "Comment": "Defines the statemachine.",
        "StartAt": "Generate Random String",
        "States": {
            "Generate Random String": {
                "Type": "Task",
                "Resource": "arn:aws:lambda:eu-central-1:1234567890:function:randomstring",
                "ResultPath": "$.executionid",
                "Parameters": {
                "executionId.$": "$$.Execution.Id"
                },
                "Next": "SageMaker CreateTransformJob"
            },
        "SageMaker CreateTransformJob": {
            "Type": "Task",
            "Resource": "arn:aws:states:::sagemaker:createTransformJob.sync",
            "Parameters": {
            "BatchStrategy": "SingleRecord",
            "DataProcessing": {
                "InputFilter": "$",
                "JoinSource": "Input",
                "OutputFilter": "xxx"
            },
            "Environment": {
                "SAGEMAKER_MODEL_SERVER_TIMEOUT": "300"
            },
            "MaxConcurrentTransforms": 100,
            "MaxPayloadInMB": 1,
            "ModelName": "${model_name}",
            "TransformInput": {
                "DataSource": {
                    "S3DataSource": {
                        "S3DataType": "S3Prefix",
                        "S3Uri": "${s3_input_path}"
                    }
                },
                "ContentType": "application/jsonlines",
                "CompressionType": "Gzip",
                "SplitType": "Line"
            },
            "TransformJobName.$": "$.executionid",
            "TransformOutput": {
                "S3OutputPath": "${s3_output_path}",
                "Accept": "application/jsonlines",
                "AssembleWith": "Line"
            },    
            "TransformResources": {
                "InstanceType": "xxx",
                "InstanceCount": 1
            }
        },
            "End": true
        }
        }
    }

在上面的定义中,lambda 可以是一个函数,它解析我通过参数部分传递的执行 ID:

 def lambda_handler(event, context):
    return(event.get('executionId').split(':')[-1])

或者,如果您不想传递执行 ID,它可以简单地 return 随机字符串,例如

 import string
 def lambda_handler(event, context):
    return(string.ascii_uppercase + string.digits)

您可以生成各种随机字符串或在 lambda 中生成任何内容并将其传递给转换作业名称。

我想抛出另一个想法。 如果适用,您还可以使用上一个任务中的另一个 executionId 或其他唯一标识符。

我在 GlueJob 成功后触发 BatchTransform 作业。 因此,我可以获取输出变量并在 BatchTransform 作业中连接以使用新的 TransformJobName。

"TransformJobName.$": "States.Format('scoring-titanic-{}', $.CompletedOn)"