AWS Glue 中的可选作业参数?

Optional job parameter in AWS Glue?

如何为 AWS Glue 作业实施可选参数?

我创建了一个作业,该作业目前有一个字符串参数(ISO 8601 日期字符串)作为 ETL 作业中使用的输入。我想将此参数设为可选,以便作业在未提供时使用默认值(例如使用 datetime.now and datetime.isoformatin my case). I have tried using getResolvedOptions:

import sys
from awsglue.utils import getResolvedOptions

args = getResolvedOptions(sys.argv, ['ISO_8601_STRING'])

但是,当我没有传递 --ISO_8601_STRING 作业参数时,我看到以下错误:

awsglue.utils.GlueArgumentError: argument --ISO_8601_STRING is required

我看不到有可选参数的方法,但你可以在作业本身上指定默认参数,然后如果你在 运行 作业时不传递该参数,你的作业将收到默认值(注意默认值不能为空)。

使用可选参数有一个解决方法。这个想法是在解决参数之前检查参数(Scala):

val argName = 'ISO_8601_STRING'
var argValue = null
if (sysArgs.contains(s"--$argName"))
   argValue = GlueArgParser.getResolvedOptions(sysArgs, Array(argName))(argName)

移植到 Python 解决了我的问题:

if ('--{}'.format('ISO_8601_STRING') in sys.argv):
    args = getResolvedOptions(sys.argv, ['ISO_8601_STRING'])
else:
    args = {'ISO_8601_STRING': datetime.datetime.now().isoformat()}

如果您使用接口,您必须提供以“--”开头的参数名称,如“--TABLE_NAME”,而不是"TABLE_NAME",然后您可以像下面这样使用它们(python) 代码:

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'TABLE_NAME'])
table_name = args['TABLE_NAME']

matsev and 如果您只有一个可选字段,解决方案很好。

我为 python 编写了一个包装函数,它更通用,可以处理不同的极端情况(必填字段 and/or 带值的可选字段)。

import sys    
from awsglue.utils import getResolvedOptions

def get_glue_args(mandatory_fields, default_optional_args):
    """
    This is a wrapper of the glue function getResolvedOptions to take care of the following case :
    * Handling optional arguments and/or mandatory arguments
    * Optional arguments with default value
    NOTE: 
        * DO NOT USE '-' while defining args as the getResolvedOptions with replace them with '_'
        * All fields would be return as a string type with getResolvedOptions

    Arguments:
        mandatory_fields {list} -- list of mandatory fields for the job
        default_optional_args {dict} -- dict for optional fields with their default value

    Returns:
        dict -- given args with default value of optional args not filled
    """
    # The glue args are available in sys.argv with an extra '--'
    given_optional_fields_key = list(set([i[2:] for i in sys.argv]).intersection([i for i in default_optional_args]))

    args = getResolvedOptions(sys.argv,
                            mandatory_fields+given_optional_fields_key)

    # Overwrite default value if optional args are provided
    default_optional_args.update(args)

    return default_optional_args

用法:

# Defining mandatory/optional args
mandatory_fields = ['my_mandatory_field_1','my_mandatory_field_2']
default_optional_args = {'optional_field_1':'myvalue1', 'optional_field_2':'myvalue2'}
# Retrieve args
args = get_glue_args(mandatory_fields, default_optional_args)
# Access element as dict with args[‘key’]

在函数中包装

def get_glue_env_var(key, default="none"):
    if f'--{key}' in sys.argv:
        return getResolvedOptions(sys.argv, [key])[key]
    else:
        return default

可以创建一个 Step Function 来启动具有不同参数的相同 Glue 作业。状态机以选择状态开始,并根据存在的输入使用不同数量的输入。

stepFunctions:
  stateMachines:
    taskMachine:
      role:
        Fn::GetAtt: [ TaskExecutor, Arn ]
      name: ${self:service}-${opt:stage}
      definition:
        StartAt: DefaultOrNot
        States:

          DefaultOrNot:
            Type: Choice
            Choices:
              - Variable: "$.optional_input"
                IsPresent: false
                Next: DefaultTask
              - Variable: "$. optional_input"
                IsPresent: true
                Next: OptionalTask

          OptionalTask:
            Type: Task
            Resource:  "arn:aws:states:::glue:startJobRun.task0"
            Parameters:
              JobName: ${self:service}-${opt:stage}
              Arguments:
                '--log_group.$': "$.specs.log_group"
                '--log_stream.$': "$.specs.log_stream"
                '--optional_input.$': "$. optional_input"

            Catch:
              - ErrorEquals: [ 'States.TaskFailed' ]
                ResultPath: "$.errorInfo"
                Next: TaskFailed
            Next: ExitExecution


          DefaultTask:
            Type: Task
            Resource:  "arn:aws:states:::glue:startJobRun.sync"
            Parameters:
              JobName: ${self:service}-${opt:stage}
              Arguments:
                '--log_group.$': "$.specs.log_group"
                '--log_stream.$': "$.specs.log_stream"


            Catch:
              - ErrorEquals: [ 'States.TaskFailed' ]
                ResultPath: "$.errorInfo"
                Next: TaskFailed
            Next: ExitExecution

          TaskFailed:
            Type: Fail
            Error: "Failure"

          ExitExecution:
            Type: Pass
            End: True