如何在 Airflow 中使用 last_mod_dt(时间戳)从 oracle 数据源执行每小时增量提取?
How to perform hourly incremental extracts from an oracle datasource using last_mod_dt (timestamp) in Airflow?
需要每小时 table refresh/loads 使用 Airflow/Python 从 oracle 数据源使用 last_modified_dt 时间戳列。
在 Airflow 中有 airflow.models.taskinstance API 公开来自 task_instance 元数据 table 的数据并且具有以下字段(与示例数据一起显示),假设 dag/tasks 第一次执行 date/time 是 1/1/2020 05:00 :-
task_id, dag_id, execution_datetime (of dag), start_date, end_date, duration, state, ....
task_a, oracle, 1/1/2020 05:00:00, 1/1/2020 05:00:00, 1/1/2020 05:05:00, 0.5, success, ....
task_b, oracle, 1/1/2020 05:00:00, 1/1/2020 05:01:00, 1/1/2020 05:04:00, 0.3, success, ....
task_c, oracle, 1/1/202005:00:00, 1/1/2020 05:02:00, 1/1/2020 05:06:00, 0.4, success, ....
所以,我正在考虑使用此 task_instance 元数据 table 或 API 来获取每个任务的前一个开始日期时间及其状态(成功),并在条件下使用它如下所示:
所以,当 运行 在 2020 年 1 月 1 日一小时后 06:00:00 :-
select * from table_a where last_mod_dttm > prev(start_datetime of task_id=task_a) and state = sucesss;
select * from table_b where last_mod_dttm > prev(start_datetime of task_id=task_b) and state = sucesss;
select * from table_c where last_mod_dttm > prev(start_datetime of task_id=task_c) and state = sucesss;
这种做法对吗?如果是,那么每次直接查询气流元数据 task_instance table 以获得任务的前一个或最大值(start_datetime)会对性能产生影响吗?如果是,那么我们如何通过 airflow.models.taskinstance API (https://airflow.readthedocs.io/en/latest/_api/airflow/models/taskinstance/index.html)
获取任务的先前 start_datetime 和“成功”状态
谢谢!
首先了解 execution_date
的工作原理很重要,请参阅 Scheduler Doc:
The scheduler won’t trigger your tasks until the period it covers has
ended e.g., A job with schedule_interval set as @daily runs after the
day has ended. This technique makes sure that whatever data is
required for that period is fully available before the dag is
executed. In the UI, it appears as if Airflow is running your tasks a
day late.
If you run a DAG on a schedule_interval of one day, the run with
execution_date 2019-11-21 triggers soon after 2019-11-21T23:59.
Let’s Repeat That, the scheduler runs your job one schedule_interval
AFTER the start date, at the END of the period.
这意味着通过引用 execution_date
你可以得到最后一个 运行 被触发的确切时间。
关于查询,我不会查询数据库来获取最后执行日期,而是使用 Airflow 开箱即用的宏 - see this reference:
您应该可以在查询中使用 {{ execution_date }}
,当 DAG 运行 被触发时,Airflow 应该会替换它。
需要每小时 table refresh/loads 使用 Airflow/Python 从 oracle 数据源使用 last_modified_dt 时间戳列。
在 Airflow 中有 airflow.models.taskinstance API 公开来自 task_instance 元数据 table 的数据并且具有以下字段(与示例数据一起显示),假设 dag/tasks 第一次执行 date/time 是 1/1/2020 05:00 :-
task_id, dag_id, execution_datetime (of dag), start_date, end_date, duration, state, ....
task_a, oracle, 1/1/2020 05:00:00, 1/1/2020 05:00:00, 1/1/2020 05:05:00, 0.5, success, ....
task_b, oracle, 1/1/2020 05:00:00, 1/1/2020 05:01:00, 1/1/2020 05:04:00, 0.3, success, ....
task_c, oracle, 1/1/202005:00:00, 1/1/2020 05:02:00, 1/1/2020 05:06:00, 0.4, success, ....
所以,我正在考虑使用此 task_instance 元数据 table 或 API 来获取每个任务的前一个开始日期时间及其状态(成功),并在条件下使用它如下所示:
所以,当 运行 在 2020 年 1 月 1 日一小时后 06:00:00 :-
select * from table_a where last_mod_dttm > prev(start_datetime of task_id=task_a) and state = sucesss;
select * from table_b where last_mod_dttm > prev(start_datetime of task_id=task_b) and state = sucesss;
select * from table_c where last_mod_dttm > prev(start_datetime of task_id=task_c) and state = sucesss;
这种做法对吗?如果是,那么每次直接查询气流元数据 task_instance table 以获得任务的前一个或最大值(start_datetime)会对性能产生影响吗?如果是,那么我们如何通过 airflow.models.taskinstance API (https://airflow.readthedocs.io/en/latest/_api/airflow/models/taskinstance/index.html)
获取任务的先前 start_datetime 和“成功”状态谢谢!
首先了解 execution_date
的工作原理很重要,请参阅 Scheduler Doc:
The scheduler won’t trigger your tasks until the period it covers has ended e.g., A job with schedule_interval set as @daily runs after the day has ended. This technique makes sure that whatever data is required for that period is fully available before the dag is executed. In the UI, it appears as if Airflow is running your tasks a day late.
If you run a DAG on a schedule_interval of one day, the run with execution_date 2019-11-21 triggers soon after 2019-11-21T23:59.
Let’s Repeat That, the scheduler runs your job one schedule_interval AFTER the start date, at the END of the period.
这意味着通过引用 execution_date
你可以得到最后一个 运行 被触发的确切时间。
关于查询,我不会查询数据库来获取最后执行日期,而是使用 Airflow 开箱即用的宏 - see this reference:
您应该可以在查询中使用 {{ execution_date }}
,当 DAG 运行 被触发时,Airflow 应该会替换它。