如何使用for循环在PostgresOperator Airflow中传递参数
How to pass parameter in PostgresOperator Airflow using for loop
我正在使用 PostgresOperator,我想将 table 名称后缀传递给我的 SQL 查询,这样当它查询数据时,它会从 for 循环迭代中动态读取
for country in countries:
matchTimeStamp = ShortCircuitOperator(task_id='Match_Updated_dates_{}'.format(country), provide_context=True,
python_callable=match_dates,op_kwargs={'key1': country}, default_args=default_args)
正如您所见,我在 task_id 中通过了 .format(country)。我想通过像下面的 SQL 声明一样传递国家名称来做类似的事情,但似乎 Airflow 不喜欢它。请推荐一个正确的方法
最后,我在 SQL 语句
的末尾传递了 .format country
import_redshift_table = PostgresOperator(
task_id='copy_data_from_redshift_{}'.format(country),
postgres_conn_id='postgres_default',
sql='''
unload ('select * from angaza_public_{}.accounts')
to 's3://mygluecrawlerbucket/angaza_accounts/to_be_processed/anagaza_{}.csv'
credentials 'aws_access_key_id=AWSDD****HHJJJJ;aws_secret_access_key=ABCDEFDHPASSEORD/JmlGjyEQMVOBme'
DELIMITER ','
HEADER
PARALLEL OFF
'''.format(country))
----- 更新,我找到了解决方案-------- 我在 .format(country,country)
中添加了一个额外的 coutry 关键字
import_redshift_table = PostgresOperator(
task_id='copy_data_from_redshift_{}'.format(country),
postgres_conn_id='postgres_default',
sql='''
unload ('select * from angaza_public_{}.accounts')
to 's3://mygluecrawlerbucket/angaza_accounts/to_be_processed/anagaza_{}.csv'
credentials 'aws_access_key_id=AKIA6J7OV4FRSYH6DIXL;aws_secret_access_key=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'
DELIMITER ','
HEADER
PARALLEL OFF
'''.format(country, country))
你有一个额外的括号导致它不起作用。另外,我认为 f-string 比 .format 更具可读性。有了这个,它将起作用:
import_redshift_table = PostgresOperator(
task_id=f'copy_data_from_redshift_{country}',
postgres_conn_id='postgres_default', # this is not necessary if its the default
sql=f"""
unload ('select * from angaza_public_{country}.accounts')
to 's3://mygluecrawlerbucket/angaza_accounts/to_be_processed/anagaza_{country}.csv'
credentials 'aws_access_key_id=AWSDD****HHJJJJ;aws_secret_access_key=ABCDEFDHPASSEORD/JmlGjyEQMVOBme'
DELIMITER ','
HEADER
PARALLEL OFF
"""
顺便说一下,使用 IAM 角色而不是凭据从 Redshift 卸载是一个很好的做法,这样它们就不会出现在日志中。
我正在使用 PostgresOperator,我想将 table 名称后缀传递给我的 SQL 查询,这样当它查询数据时,它会从 for 循环迭代中动态读取
for country in countries:
matchTimeStamp = ShortCircuitOperator(task_id='Match_Updated_dates_{}'.format(country), provide_context=True,
python_callable=match_dates,op_kwargs={'key1': country}, default_args=default_args)
正如您所见,我在 task_id 中通过了 .format(country)。我想通过像下面的 SQL 声明一样传递国家名称来做类似的事情,但似乎 Airflow 不喜欢它。请推荐一个正确的方法 最后,我在 SQL 语句
的末尾传递了 .format countryimport_redshift_table = PostgresOperator(
task_id='copy_data_from_redshift_{}'.format(country),
postgres_conn_id='postgres_default',
sql='''
unload ('select * from angaza_public_{}.accounts')
to 's3://mygluecrawlerbucket/angaza_accounts/to_be_processed/anagaza_{}.csv'
credentials 'aws_access_key_id=AWSDD****HHJJJJ;aws_secret_access_key=ABCDEFDHPASSEORD/JmlGjyEQMVOBme'
DELIMITER ','
HEADER
PARALLEL OFF
'''.format(country))
----- 更新,我找到了解决方案-------- 我在 .format(country,country)
中添加了一个额外的 coutry 关键字 import_redshift_table = PostgresOperator(
task_id='copy_data_from_redshift_{}'.format(country),
postgres_conn_id='postgres_default',
sql='''
unload ('select * from angaza_public_{}.accounts')
to 's3://mygluecrawlerbucket/angaza_accounts/to_be_processed/anagaza_{}.csv'
credentials 'aws_access_key_id=AKIA6J7OV4FRSYH6DIXL;aws_secret_access_key=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'
DELIMITER ','
HEADER
PARALLEL OFF
'''.format(country, country))
你有一个额外的括号导致它不起作用。另外,我认为 f-string 比 .format 更具可读性。有了这个,它将起作用:
import_redshift_table = PostgresOperator(
task_id=f'copy_data_from_redshift_{country}',
postgres_conn_id='postgres_default', # this is not necessary if its the default
sql=f"""
unload ('select * from angaza_public_{country}.accounts')
to 's3://mygluecrawlerbucket/angaza_accounts/to_be_processed/anagaza_{country}.csv'
credentials 'aws_access_key_id=AWSDD****HHJJJJ;aws_secret_access_key=ABCDEFDHPASSEORD/JmlGjyEQMVOBme'
DELIMITER ','
HEADER
PARALLEL OFF
"""
顺便说一下,使用 IAM 角色而不是凭据从 Redshift 卸载是一个很好的做法,这样它们就不会出现在日志中。