Airflow 会从函数参数中的 ds 或 **kwargs 读取吗
Will Airflow read from ds or **kwargs in function parameters
大家好,我有一个功能
def get_campaign_active(ds, **kwargs):
logging.info('Checking for inactive campaign types..')
the_db = ds['client']
db = the_db['misc-server']
collection = db.campaigntypes
campaign = list(collection.find({}))
for item in campaign:
if item['active'] == False:
# storing false 'active' campaigns
result = "'{}' active status set to False".format(item['text'])
logging.info("'{}' active status set to False".format(item['text']))
映射到气流任务
get_campaign_active = PythonOperator(
task_id='get_campaign_active',
provide_context=True,
python_callable=get_campaign_active,
xcom_push=True,
op_kwargs={'client': client_production},
dag=dag)
如您所见,我在任务中将 client_production
变量传入 op_kwargs。当此任务在气流中 运行 时,希望该变量通过函数中的 '**kwargs' 参数传入。
但是为了测试,当我尝试像这样调用函数时
get_campaign_active({"client":client_production})
client_production 变量位于 ds
参数内。我没有 airflow 的临时服务器来测试它,但是有人可以告诉我如果我将这个 function/task 部署到 airflow,它会从 ds
或 kwargs
?
现在如果我尝试访问 kwargs 中的 'client' 键,kwargs 是空的。
谢谢
你应该这样做:
def get_campaign_active(ds, **kwargs):
logging.info('Checking for inactive campaign types..')
the_db = kwargs['client']
ds
(以及所有其他 macros 在您设置 provide_context=True
时传递给 kwargs,您可以像您一样使用命名参数或让 ds 传递给 kwargs还有)
由于在您的代码中您实际上并未使用 ds 或任何其他宏,因此您可以将函数签名更改为 get_campaign_active(**kwargs)
并删除 provide_context=True
。请注意,从 Airflow>=2.0
开始,根本不需要 provide_context=True
。
大家好,我有一个功能
def get_campaign_active(ds, **kwargs):
logging.info('Checking for inactive campaign types..')
the_db = ds['client']
db = the_db['misc-server']
collection = db.campaigntypes
campaign = list(collection.find({}))
for item in campaign:
if item['active'] == False:
# storing false 'active' campaigns
result = "'{}' active status set to False".format(item['text'])
logging.info("'{}' active status set to False".format(item['text']))
映射到气流任务
get_campaign_active = PythonOperator(
task_id='get_campaign_active',
provide_context=True,
python_callable=get_campaign_active,
xcom_push=True,
op_kwargs={'client': client_production},
dag=dag)
如您所见,我在任务中将 client_production
变量传入 op_kwargs。当此任务在气流中 运行 时,希望该变量通过函数中的 '**kwargs' 参数传入。
但是为了测试,当我尝试像这样调用函数时
get_campaign_active({"client":client_production})
client_production 变量位于 ds
参数内。我没有 airflow 的临时服务器来测试它,但是有人可以告诉我如果我将这个 function/task 部署到 airflow,它会从 ds
或 kwargs
?
现在如果我尝试访问 kwargs 中的 'client' 键,kwargs 是空的。
谢谢
你应该这样做:
def get_campaign_active(ds, **kwargs):
logging.info('Checking for inactive campaign types..')
the_db = kwargs['client']
ds
(以及所有其他 macros 在您设置 provide_context=True
时传递给 kwargs,您可以像您一样使用命名参数或让 ds 传递给 kwargs还有)
由于在您的代码中您实际上并未使用 ds 或任何其他宏,因此您可以将函数签名更改为 get_campaign_active(**kwargs)
并删除 provide_context=True
。请注意,从 Airflow>=2.0
开始,根本不需要 provide_context=True
。