Airflow 会从函数参数中的 ds 或 **kwargs 读取吗

Will Airflow read from ds or **kwargs in function parameters

大家好,我有一个功能

def get_campaign_active(ds, **kwargs):
    logging.info('Checking for inactive campaign types..')
    the_db = ds['client']
    db = the_db['misc-server']
    collection = db.campaigntypes
    campaign = list(collection.find({})) 
    for item in campaign:
        if item['active'] == False:
            # storing false 'active' campaigns
            result = "'{}' active status set to False".format(item['text'])
            logging.info("'{}' active status set to False".format(item['text']))

映射到气流任务

get_campaign_active = PythonOperator(
    task_id='get_campaign_active',
    provide_context=True,
    python_callable=get_campaign_active,
    xcom_push=True,
    op_kwargs={'client': client_production},
    dag=dag)

如您所见,我在任务中将 client_production 变量传入 op_kwargs。当此任务在气流中 运行 时,希望该变量通过函数中的 '**kwargs' 参数传入。

但是为了测试,当我尝试像这样调用函数时

get_campaign_active({"client":client_production})

client_production 变量位于 ds 参数内。我没有 airflow 的临时服务器来测试它,但是有人可以告诉我如果我将这个 function/task 部署到 airflow,它会从 dskwargs?

现在如果我尝试访问 kwargs 中的 'client' 键,kwargs 是空的。

谢谢

你应该这样做:

def get_campaign_active(ds, **kwargs):
    logging.info('Checking for inactive campaign types..')
    the_db = kwargs['client']

ds(以及所有其他 macros 在您设置 provide_context=True 时传递给 kwargs,您可以像您一样使用命名参数或让 ds 传递给 kwargs还有)

由于在您的代码中您实际上并未使用 ds 或任何其他宏,因此您可以将函数签名更改为 get_campaign_active(**kwargs) 并删除 provide_context=True。请注意,从 Airflow>=2.0 开始,根本不需要 provide_context=True