通过 CLI 访问传递给 Airflow 的配置参数
Accessing configuration parameters passed to Airflow through CLI
我试图在触发 dag 运行 时将以下配置参数传递给 Airflow CLI。以下是我正在使用的 trigger_dag 命令。
airflow trigger_dag -c '{"account_list":"[1,2,3,4,5]", "start_date":"2016-04-25"}' insights_assembly_9900
我的问题是如何访问 dag 运行.
中运算符内部传递的 con 参数
有两种方法可以访问airflow trigger_dag
命令中传递的参数。
在PythonOperator中定义的可调用方法中,可以访问参数为kwargs['dag_run'].conf.get('account_list')
鉴于您正在使用这个东西的字段是模板字段,可以使用 {{ dag_run.conf['account_list'] }}
外部可触发 DAG 的 schedule_interval
设置为 None
上述工作方法
这可能是 devj
提供的答案的延续。
在 airflow.cfg
时,以下 属性 应设置为 true:
dag_run_conf_overrides_params=True
定义 PythonOperator 时,传递以下参数 provide_context=True
。例如:
get_row_count_operator = PythonOperator(task_id='get_row_count', python_callable=do_work, dag=dag, provide_context=True)
- 定义python可调用函数(注意
**kwargs
的使用):
def do_work(**kwargs):
table_name = kwargs['dag_run'].conf.get('table_name')
# Rest of the code
- 从命令行调用 dag:
airflow trigger_dag read_hive --conf '{"table_name":"my_table_name"}'
我发现 this 讨论很有帮助。
如果您尝试访问 Airflow 系统范围的配置(而不是 DAG 配置),以下内容可能会有所帮助:
首先,导入这个
from airflow.configuration import conf
其次,获取某处的值
conf.get("core", "my_key")
可以,设置一个值
conf.set("core", "my_key", "my_val")
对于我的用例,我必须使用 API 将参数传递给气流工作流(或任务)。我的工作流程如下:当一个新文件登陆 S3 存储桶时触发 Lambda,Lambda 依次触发一个气流 DAG 并传递存储桶名称和文件的密钥。
这是我的解决方案:
s3 = boto3.client('s3')
mwaa = boto3.client('mwaa')
def lambda_handler(event, context):
# print("Received event: " + json.dumps(event, indent=2))
# Get the object from the event and show its content type
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
mwaa_cli_token = mwaa.create_cli_token(
Name=mwaa_env_name
)
mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken']
mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname'])
conf = {'bucket': bucket, 'key': key}
raw_data = """{0} {1} --conf '{2}'""".format(mwaa_cli_command, dag_name, json.dumps(conf))
# pass the key and bucket name to airflow to initiate the workflow
requests.post(
mwaa_webserver_hostname,
headers={
'Authorization': mwaa_auth_token,
'Content-Type': 'text/plain'
},
data=raw_data
)
我试图在触发 dag 运行 时将以下配置参数传递给 Airflow CLI。以下是我正在使用的 trigger_dag 命令。
airflow trigger_dag -c '{"account_list":"[1,2,3,4,5]", "start_date":"2016-04-25"}' insights_assembly_9900
我的问题是如何访问 dag 运行.
中运算符内部传递的 con 参数有两种方法可以访问airflow trigger_dag
命令中传递的参数。
在PythonOperator中定义的可调用方法中,可以访问参数为
kwargs['dag_run'].conf.get('account_list')
鉴于您正在使用这个东西的字段是模板字段,可以使用
{{ dag_run.conf['account_list'] }}
外部可触发 DAG 的 schedule_interval
设置为 None
上述工作方法
这可能是 devj
提供的答案的延续。
在
airflow.cfg
时,以下 属性 应设置为 true:dag_run_conf_overrides_params=True
定义 PythonOperator 时,传递以下参数
provide_context=True
。例如:
get_row_count_operator = PythonOperator(task_id='get_row_count', python_callable=do_work, dag=dag, provide_context=True)
- 定义python可调用函数(注意
**kwargs
的使用):
def do_work(**kwargs): table_name = kwargs['dag_run'].conf.get('table_name') # Rest of the code
- 从命令行调用 dag:
airflow trigger_dag read_hive --conf '{"table_name":"my_table_name"}'
我发现 this 讨论很有帮助。
如果您尝试访问 Airflow 系统范围的配置(而不是 DAG 配置),以下内容可能会有所帮助:
首先,导入这个
from airflow.configuration import conf
其次,获取某处的值
conf.get("core", "my_key")
可以,设置一个值
conf.set("core", "my_key", "my_val")
对于我的用例,我必须使用 API 将参数传递给气流工作流(或任务)。我的工作流程如下:当一个新文件登陆 S3 存储桶时触发 Lambda,Lambda 依次触发一个气流 DAG 并传递存储桶名称和文件的密钥。
这是我的解决方案:
s3 = boto3.client('s3')
mwaa = boto3.client('mwaa')
def lambda_handler(event, context):
# print("Received event: " + json.dumps(event, indent=2))
# Get the object from the event and show its content type
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
mwaa_cli_token = mwaa.create_cli_token(
Name=mwaa_env_name
)
mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken']
mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname'])
conf = {'bucket': bucket, 'key': key}
raw_data = """{0} {1} --conf '{2}'""".format(mwaa_cli_command, dag_name, json.dumps(conf))
# pass the key and bucket name to airflow to initiate the workflow
requests.post(
mwaa_webserver_hostname,
headers={
'Authorization': mwaa_auth_token,
'Content-Type': 'text/plain'
},
data=raw_data
)