Airflow:如何在自定义运算符上访问 {{ execution date }}?
Airflow: How can I access {{ execution date }} on my custom operator?
我在 Airflow 上构建了一个自定义运算符,它调用 API 来获取数据,然后将其写入 BigQuery。但是,问题是我必须将 execution_date 宏作为 API 参数传递才能调用该日期的数据。遗憾的是,当我尝试这样做时,我的操作员无法解析我传递的 jinja 模板。当我检查为此所做的日志记录时,它仅显示如下图所示的模板。
我希望你们能帮忙。
Airflow Logs
这是我的自定义运算符和 dag 的代码。谢谢!
...
class MyOperator(BaseOperator):
def __init__(self,date):
super(MyOperator,self).__init__(*arg,**kwargs)
self.date = date
def __pull_from_api(self):
api_link = "somelink.com/api/date={}".format(self.date)
data = request.get(api_link).json()
return data
def execute(self,context):
data = self.__pull_from_api()
...
dag = DAG('My Pipeline', default_args=default_args)
t1 = MyOperator(date='{{ execution_date}}', task_id='my_pipeline_1', dag=dag)
t1
我鼓励你看看PythonOperator and context dictionary。在 Composer 中使用变量时,两者都很有用。
首先,既然您已经有了自己的自定义运算符,我强烈建议您看一下here。有一些使用 Airflow 的常用宏和模板。因此,您可以找出自己可能失误的地方。
最好的方法是从已经传递给 execute(self,context) 的上下文中获取 execution_date。
例如,我在这里设置执行日期的字符串表示形式:
self.execution_date_str = context["execution_date"].strftime("%Y-%m-%d")
我在 Airflow 上构建了一个自定义运算符,它调用 API 来获取数据,然后将其写入 BigQuery。但是,问题是我必须将 execution_date 宏作为 API 参数传递才能调用该日期的数据。遗憾的是,当我尝试这样做时,我的操作员无法解析我传递的 jinja 模板。当我检查为此所做的日志记录时,它仅显示如下图所示的模板。 我希望你们能帮忙。
Airflow Logs
这是我的自定义运算符和 dag 的代码。谢谢!
...
class MyOperator(BaseOperator):
def __init__(self,date):
super(MyOperator,self).__init__(*arg,**kwargs)
self.date = date
def __pull_from_api(self):
api_link = "somelink.com/api/date={}".format(self.date)
data = request.get(api_link).json()
return data
def execute(self,context):
data = self.__pull_from_api()
...
dag = DAG('My Pipeline', default_args=default_args)
t1 = MyOperator(date='{{ execution_date}}', task_id='my_pipeline_1', dag=dag)
t1
我鼓励你看看PythonOperator and context dictionary。在 Composer 中使用变量时,两者都很有用。
首先,既然您已经有了自己的自定义运算符,我强烈建议您看一下here。有一些使用 Airflow 的常用宏和模板。因此,您可以找出自己可能失误的地方。
最好的方法是从已经传递给 execute(self,context) 的上下文中获取 execution_date。
例如,我在这里设置执行日期的字符串表示形式:
self.execution_date_str = context["execution_date"].strftime("%Y-%m-%d")