AWS Glue 中的可选作业参数?
Optional job parameter in AWS Glue?
如何为 AWS Glue 作业实施可选参数?
我创建了一个作业,该作业目前有一个字符串参数(ISO 8601 日期字符串)作为 ETL 作业中使用的输入。我想将此参数设为可选,以便作业在未提供时使用默认值(例如使用 datetime.now and datetime.isoformatin my case). I have tried using getResolvedOptions:
import sys
from awsglue.utils import getResolvedOptions
args = getResolvedOptions(sys.argv, ['ISO_8601_STRING'])
但是,当我没有传递 --ISO_8601_STRING
作业参数时,我看到以下错误:
awsglue.utils.GlueArgumentError: argument --ISO_8601_STRING is required
我看不到有可选参数的方法,但你可以在作业本身上指定默认参数,然后如果你在 运行 作业时不传递该参数,你的作业将收到默认值(注意默认值不能为空)。
使用可选参数有一个解决方法。这个想法是在解决参数之前检查参数(Scala):
val argName = 'ISO_8601_STRING'
var argValue = null
if (sysArgs.contains(s"--$argName"))
argValue = GlueArgParser.getResolvedOptions(sysArgs, Array(argName))(argName)
将 移植到 Python 解决了我的问题:
if ('--{}'.format('ISO_8601_STRING') in sys.argv):
args = getResolvedOptions(sys.argv, ['ISO_8601_STRING'])
else:
args = {'ISO_8601_STRING': datetime.datetime.now().isoformat()}
如果您使用接口,您必须提供以“--”开头的参数名称,如“--TABLE_NAME”,而不是"TABLE_NAME",然后您可以像下面这样使用它们(python) 代码:
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'TABLE_NAME'])
table_name = args['TABLE_NAME']
matsev and 如果您只有一个可选字段,解决方案很好。
我为 python 编写了一个包装函数,它更通用,可以处理不同的极端情况(必填字段 and/or 带值的可选字段)。
import sys
from awsglue.utils import getResolvedOptions
def get_glue_args(mandatory_fields, default_optional_args):
"""
This is a wrapper of the glue function getResolvedOptions to take care of the following case :
* Handling optional arguments and/or mandatory arguments
* Optional arguments with default value
NOTE:
* DO NOT USE '-' while defining args as the getResolvedOptions with replace them with '_'
* All fields would be return as a string type with getResolvedOptions
Arguments:
mandatory_fields {list} -- list of mandatory fields for the job
default_optional_args {dict} -- dict for optional fields with their default value
Returns:
dict -- given args with default value of optional args not filled
"""
# The glue args are available in sys.argv with an extra '--'
given_optional_fields_key = list(set([i[2:] for i in sys.argv]).intersection([i for i in default_optional_args]))
args = getResolvedOptions(sys.argv,
mandatory_fields+given_optional_fields_key)
# Overwrite default value if optional args are provided
default_optional_args.update(args)
return default_optional_args
用法:
# Defining mandatory/optional args
mandatory_fields = ['my_mandatory_field_1','my_mandatory_field_2']
default_optional_args = {'optional_field_1':'myvalue1', 'optional_field_2':'myvalue2'}
# Retrieve args
args = get_glue_args(mandatory_fields, default_optional_args)
# Access element as dict with args[‘key’]
在函数中包装 :
def get_glue_env_var(key, default="none"):
if f'--{key}' in sys.argv:
return getResolvedOptions(sys.argv, [key])[key]
else:
return default
可以创建一个 Step Function 来启动具有不同参数的相同 Glue 作业。状态机以选择状态开始,并根据存在的输入使用不同数量的输入。
stepFunctions:
stateMachines:
taskMachine:
role:
Fn::GetAtt: [ TaskExecutor, Arn ]
name: ${self:service}-${opt:stage}
definition:
StartAt: DefaultOrNot
States:
DefaultOrNot:
Type: Choice
Choices:
- Variable: "$.optional_input"
IsPresent: false
Next: DefaultTask
- Variable: "$. optional_input"
IsPresent: true
Next: OptionalTask
OptionalTask:
Type: Task
Resource: "arn:aws:states:::glue:startJobRun.task0"
Parameters:
JobName: ${self:service}-${opt:stage}
Arguments:
'--log_group.$': "$.specs.log_group"
'--log_stream.$': "$.specs.log_stream"
'--optional_input.$': "$. optional_input"
Catch:
- ErrorEquals: [ 'States.TaskFailed' ]
ResultPath: "$.errorInfo"
Next: TaskFailed
Next: ExitExecution
DefaultTask:
Type: Task
Resource: "arn:aws:states:::glue:startJobRun.sync"
Parameters:
JobName: ${self:service}-${opt:stage}
Arguments:
'--log_group.$': "$.specs.log_group"
'--log_stream.$': "$.specs.log_stream"
Catch:
- ErrorEquals: [ 'States.TaskFailed' ]
ResultPath: "$.errorInfo"
Next: TaskFailed
Next: ExitExecution
TaskFailed:
Type: Fail
Error: "Failure"
ExitExecution:
Type: Pass
End: True
如何为 AWS Glue 作业实施可选参数?
我创建了一个作业,该作业目前有一个字符串参数(ISO 8601 日期字符串)作为 ETL 作业中使用的输入。我想将此参数设为可选,以便作业在未提供时使用默认值(例如使用 datetime.now and datetime.isoformatin my case). I have tried using getResolvedOptions:
import sys
from awsglue.utils import getResolvedOptions
args = getResolvedOptions(sys.argv, ['ISO_8601_STRING'])
但是,当我没有传递 --ISO_8601_STRING
作业参数时,我看到以下错误:
awsglue.utils.GlueArgumentError: argument --ISO_8601_STRING is required
我看不到有可选参数的方法,但你可以在作业本身上指定默认参数,然后如果你在 运行 作业时不传递该参数,你的作业将收到默认值(注意默认值不能为空)。
使用可选参数有一个解决方法。这个想法是在解决参数之前检查参数(Scala):
val argName = 'ISO_8601_STRING'
var argValue = null
if (sysArgs.contains(s"--$argName"))
argValue = GlueArgParser.getResolvedOptions(sysArgs, Array(argName))(argName)
将
if ('--{}'.format('ISO_8601_STRING') in sys.argv):
args = getResolvedOptions(sys.argv, ['ISO_8601_STRING'])
else:
args = {'ISO_8601_STRING': datetime.datetime.now().isoformat()}
如果您使用接口,您必须提供以“--”开头的参数名称,如“--TABLE_NAME”,而不是"TABLE_NAME",然后您可以像下面这样使用它们(python) 代码:
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'TABLE_NAME'])
table_name = args['TABLE_NAME']
matsev and
我为 python 编写了一个包装函数,它更通用,可以处理不同的极端情况(必填字段 and/or 带值的可选字段)。
import sys
from awsglue.utils import getResolvedOptions
def get_glue_args(mandatory_fields, default_optional_args):
"""
This is a wrapper of the glue function getResolvedOptions to take care of the following case :
* Handling optional arguments and/or mandatory arguments
* Optional arguments with default value
NOTE:
* DO NOT USE '-' while defining args as the getResolvedOptions with replace them with '_'
* All fields would be return as a string type with getResolvedOptions
Arguments:
mandatory_fields {list} -- list of mandatory fields for the job
default_optional_args {dict} -- dict for optional fields with their default value
Returns:
dict -- given args with default value of optional args not filled
"""
# The glue args are available in sys.argv with an extra '--'
given_optional_fields_key = list(set([i[2:] for i in sys.argv]).intersection([i for i in default_optional_args]))
args = getResolvedOptions(sys.argv,
mandatory_fields+given_optional_fields_key)
# Overwrite default value if optional args are provided
default_optional_args.update(args)
return default_optional_args
用法:
# Defining mandatory/optional args
mandatory_fields = ['my_mandatory_field_1','my_mandatory_field_2']
default_optional_args = {'optional_field_1':'myvalue1', 'optional_field_2':'myvalue2'}
# Retrieve args
args = get_glue_args(mandatory_fields, default_optional_args)
# Access element as dict with args[‘key’]
在函数中包装
def get_glue_env_var(key, default="none"):
if f'--{key}' in sys.argv:
return getResolvedOptions(sys.argv, [key])[key]
else:
return default
可以创建一个 Step Function 来启动具有不同参数的相同 Glue 作业。状态机以选择状态开始,并根据存在的输入使用不同数量的输入。
stepFunctions:
stateMachines:
taskMachine:
role:
Fn::GetAtt: [ TaskExecutor, Arn ]
name: ${self:service}-${opt:stage}
definition:
StartAt: DefaultOrNot
States:
DefaultOrNot:
Type: Choice
Choices:
- Variable: "$.optional_input"
IsPresent: false
Next: DefaultTask
- Variable: "$. optional_input"
IsPresent: true
Next: OptionalTask
OptionalTask:
Type: Task
Resource: "arn:aws:states:::glue:startJobRun.task0"
Parameters:
JobName: ${self:service}-${opt:stage}
Arguments:
'--log_group.$': "$.specs.log_group"
'--log_stream.$': "$.specs.log_stream"
'--optional_input.$': "$. optional_input"
Catch:
- ErrorEquals: [ 'States.TaskFailed' ]
ResultPath: "$.errorInfo"
Next: TaskFailed
Next: ExitExecution
DefaultTask:
Type: Task
Resource: "arn:aws:states:::glue:startJobRun.sync"
Parameters:
JobName: ${self:service}-${opt:stage}
Arguments:
'--log_group.$': "$.specs.log_group"
'--log_stream.$': "$.specs.log_stream"
Catch:
- ErrorEquals: [ 'States.TaskFailed' ]
ResultPath: "$.errorInfo"
Next: TaskFailed
Next: ExitExecution
TaskFailed:
Type: Fail
Error: "Failure"
ExitExecution:
Type: Pass
End: True