如何将 stepfunction executionId 解析为 SageMaker 批量转换作业名称?
How to parse stepfunction executionId to SageMaker batch transform job name?
我创建了一个 stepfunction,下面这个状态机的定义 (step-function.json
) 在 terraform 中使用(使用本页中的语法:https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateTransformJob.html)
如果我第一次执行这个状态机,它会创建一个名为 example-jobname
的 SageMaker 批量转换作业,但我需要每天执行这个状态机,然后它会给我错误 "error": "SageMaker.ResourceInUseException", "cause": "Job name must be unique within an AWS account and region, and a job with this name already exists
。
原因是因为作业名称被硬编码为example-jobname
所以如果状态机在第一次执行后执行,由于作业名称需要唯一,任务会失败,只是想知道我如何添加一个字符串(类似于作业名称末尾的 ExecutionId)。这是我尝试过的:
我在 json 文件的 Parameters
部分添加了 "executionId.$": "States.Format('somestring {}', $$.Execution.Id)"
,但是当我执行任务时出现错误 "error": "States.Runtime", "cause": "An error occurred while executing the state 'SageMaker CreateTransformJob' (entered at the event id #2). The Parameters '{\"BatchStrategy\":\"SingleRecord\",..............\"executionId\":\"somestring arn:aws:states:us-east-1:xxxxx:execution:xxxxx-state-machine:xxxxxxxx72950\"}' could not be used to start the Task: [The field \"executionId\" is not supported by Step Functions]"}
我修改了json文件中的jobname为"TransformJobName": "example-jobname-States.Format('somestring {}', $$.Execution.Id)",
,当我执行statemachine时,报错:"error": "SageMaker.AmazonSageMakerException", "cause": "2 validation errors detected: Value 'example-jobname-States.Format('somestring {}', $$.Execution.Id)' at 'transformJobName' failed to satisfy constraint: Member must satisfy regular expression pattern: ^[a-zA-Z0-9](-*[a-zA-Z0-9]){0,62}; Value 'example-jobname-States.Format('somestring {}', $$.Execution.Id)' at 'transformJobName' failed to satisfy constraint: Member must have length less than or equal to 63
我真的运行没主意了,有人可以帮忙吗?非常感谢。
因此,根据 documentation,我们应该按以下格式传递参数
"Parameters": {
"ModelName.$": "$$.Execution.Name",
....
},
如果你仔细看看你的定义中缺少的东西,所以你的步进函数定义应该如下所示:
任一
"TransformJobName.$": "$$.Execution.Id",
或
"TransformJobName.$: "States.Format('mytransformjob{}', $$.Execution.Id)"
完整的状态机定义:
{
"Comment": "Defines the statemachine.",
"StartAt": "Generate Random String",
"States": {
"Generate Random String": {
"Type": "Task",
"Resource": "arn:aws:lambda:eu-central-1:1234567890:function:randomstring",
"ResultPath": "$.executionid",
"Parameters": {
"executionId.$": "$$.Execution.Id"
},
"Next": "SageMaker CreateTransformJob"
},
"SageMaker CreateTransformJob": {
"Type": "Task",
"Resource": "arn:aws:states:::sagemaker:createTransformJob.sync",
"Parameters": {
"BatchStrategy": "SingleRecord",
"DataProcessing": {
"InputFilter": "$",
"JoinSource": "Input",
"OutputFilter": "xxx"
},
"Environment": {
"SAGEMAKER_MODEL_SERVER_TIMEOUT": "300"
},
"MaxConcurrentTransforms": 100,
"MaxPayloadInMB": 1,
"ModelName": "${model_name}",
"TransformInput": {
"DataSource": {
"S3DataSource": {
"S3DataType": "S3Prefix",
"S3Uri": "${s3_input_path}"
}
},
"ContentType": "application/jsonlines",
"CompressionType": "Gzip",
"SplitType": "Line"
},
"TransformJobName.$": "$.executionid",
"TransformOutput": {
"S3OutputPath": "${s3_output_path}",
"Accept": "application/jsonlines",
"AssembleWith": "Line"
},
"TransformResources": {
"InstanceType": "xxx",
"InstanceCount": 1
}
},
"End": true
}
}
}
在上面的定义中,lambda 可以是一个函数,它解析我通过参数部分传递的执行 ID:
def lambda_handler(event, context):
return(event.get('executionId').split(':')[-1])
或者,如果您不想传递执行 ID,它可以简单地 return 随机字符串,例如
import string
def lambda_handler(event, context):
return(string.ascii_uppercase + string.digits)
您可以生成各种随机字符串或在 lambda 中生成任何内容并将其传递给转换作业名称。
我想抛出另一个想法。
如果适用,您还可以使用上一个任务中的另一个 executionId 或其他唯一标识符。
我在 GlueJob 成功后触发 BatchTransform 作业。
因此,我可以获取输出变量并在 BatchTransform 作业中连接以使用新的 TransformJobName。
"TransformJobName.$": "States.Format('scoring-titanic-{}', $.CompletedOn)"
我创建了一个 stepfunction,下面这个状态机的定义 (step-function.json
) 在 terraform 中使用(使用本页中的语法:https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateTransformJob.html)
如果我第一次执行这个状态机,它会创建一个名为 example-jobname
的 SageMaker 批量转换作业,但我需要每天执行这个状态机,然后它会给我错误 "error": "SageMaker.ResourceInUseException", "cause": "Job name must be unique within an AWS account and region, and a job with this name already exists
。
原因是因为作业名称被硬编码为example-jobname
所以如果状态机在第一次执行后执行,由于作业名称需要唯一,任务会失败,只是想知道我如何添加一个字符串(类似于作业名称末尾的 ExecutionId)。这是我尝试过的:
我在 json 文件的
Parameters
部分添加了"executionId.$": "States.Format('somestring {}', $$.Execution.Id)"
,但是当我执行任务时出现错误"error": "States.Runtime", "cause": "An error occurred while executing the state 'SageMaker CreateTransformJob' (entered at the event id #2). The Parameters '{\"BatchStrategy\":\"SingleRecord\",..............\"executionId\":\"somestring arn:aws:states:us-east-1:xxxxx:execution:xxxxx-state-machine:xxxxxxxx72950\"}' could not be used to start the Task: [The field \"executionId\" is not supported by Step Functions]"}
我修改了json文件中的jobname为
"TransformJobName": "example-jobname-States.Format('somestring {}', $$.Execution.Id)",
,当我执行statemachine时,报错:"error": "SageMaker.AmazonSageMakerException", "cause": "2 validation errors detected: Value 'example-jobname-States.Format('somestring {}', $$.Execution.Id)' at 'transformJobName' failed to satisfy constraint: Member must satisfy regular expression pattern: ^[a-zA-Z0-9](-*[a-zA-Z0-9]){0,62}; Value 'example-jobname-States.Format('somestring {}', $$.Execution.Id)' at 'transformJobName' failed to satisfy constraint: Member must have length less than or equal to 63
我真的运行没主意了,有人可以帮忙吗?非常感谢。
因此,根据 documentation,我们应该按以下格式传递参数
"Parameters": {
"ModelName.$": "$$.Execution.Name",
....
},
如果你仔细看看你的定义中缺少的东西,所以你的步进函数定义应该如下所示:
任一
"TransformJobName.$": "$$.Execution.Id",
或
"TransformJobName.$: "States.Format('mytransformjob{}', $$.Execution.Id)"
完整的状态机定义:
{
"Comment": "Defines the statemachine.",
"StartAt": "Generate Random String",
"States": {
"Generate Random String": {
"Type": "Task",
"Resource": "arn:aws:lambda:eu-central-1:1234567890:function:randomstring",
"ResultPath": "$.executionid",
"Parameters": {
"executionId.$": "$$.Execution.Id"
},
"Next": "SageMaker CreateTransformJob"
},
"SageMaker CreateTransformJob": {
"Type": "Task",
"Resource": "arn:aws:states:::sagemaker:createTransformJob.sync",
"Parameters": {
"BatchStrategy": "SingleRecord",
"DataProcessing": {
"InputFilter": "$",
"JoinSource": "Input",
"OutputFilter": "xxx"
},
"Environment": {
"SAGEMAKER_MODEL_SERVER_TIMEOUT": "300"
},
"MaxConcurrentTransforms": 100,
"MaxPayloadInMB": 1,
"ModelName": "${model_name}",
"TransformInput": {
"DataSource": {
"S3DataSource": {
"S3DataType": "S3Prefix",
"S3Uri": "${s3_input_path}"
}
},
"ContentType": "application/jsonlines",
"CompressionType": "Gzip",
"SplitType": "Line"
},
"TransformJobName.$": "$.executionid",
"TransformOutput": {
"S3OutputPath": "${s3_output_path}",
"Accept": "application/jsonlines",
"AssembleWith": "Line"
},
"TransformResources": {
"InstanceType": "xxx",
"InstanceCount": 1
}
},
"End": true
}
}
}
在上面的定义中,lambda 可以是一个函数,它解析我通过参数部分传递的执行 ID:
def lambda_handler(event, context):
return(event.get('executionId').split(':')[-1])
或者,如果您不想传递执行 ID,它可以简单地 return 随机字符串,例如
import string
def lambda_handler(event, context):
return(string.ascii_uppercase + string.digits)
您可以生成各种随机字符串或在 lambda 中生成任何内容并将其传递给转换作业名称。
我想抛出另一个想法。 如果适用,您还可以使用上一个任务中的另一个 executionId 或其他唯一标识符。
我在 GlueJob 成功后触发 BatchTransform 作业。 因此,我可以获取输出变量并在 BatchTransform 作业中连接以使用新的 TransformJobName。
"TransformJobName.$": "States.Format('scoring-titanic-{}', $.CompletedOn)"