即使 DAG 不是 运行,气流变量也会更新
Airflow variables getting updated even if the DAG is not running
我正在从气流变量中读取一个整数变量,然后每次 DAG 运行时将该值递增 1,然后再次将其设置为该变量。
但是在下面的代码之后,UI 处的变量每次刷新页面时都会发生变化。
不知道是什么导致了这种行为
counter = Variable.get('counter')
s = BashOperator(
task_id='echo_start_variable',
bash_command='echo ' + counter,
dag=dag,
)
Variable.set("counter", int(counter) + 1)
sql_query = "SELECT * FROM UNNEST(SEQUENCE({start}, {end}))"
sql_query = sql_query.replace('{start}', start).replace('{end}', end)
submit_query = PythonOperator(
task_id='submit_athena_query',
python_callable=run_athena_query,
op_kwargs={'query': sql_query, 'db': 'db',
's3_output': 's3://s3-path/rohan/date=' + current_date + '/'},
dag=dag)
e = BashOperator(
task_id='echo_end_variable',
bash_command='echo ' + counter,
dag=dag,
)
s >> submit_query >> e
气流处理 DAG 文件每 30 秒一次(min_file_process_interval
设置的默认值)这意味着您拥有的任何顶级代码每 30 秒 运行 所以 Variable.set("counter", int(counter) + 1)
将使变量计数器每 30 秒增加 1。
在顶层代码中与变量交互是一种不好的做法(不管增加价值的问题)。它每 30 秒打开一个到 Metastore 数据库的连接,这可能会导致严重的问题并使数据库不堪重负。
要获取 Variable 的值,您可以使用 Jinja:
e = BashOperator(
task_id='echo_end_variable',
bash_command='echo {{ var.value.counter }}',
dag=dag,
)
这是使用变量的安全方法,因为只有在执行运算符时才会检索值。
如果要将变量的值增加 1,请使用 PythonOpeartor
:
def increase():
counter = Variable.get('counter')
Variable.set("counter", int(counter) + 1)
increase_op = PythonOperator(
task_id='increase_task',
python_callable=increase,
dag=dag)
python 可调用函数仅在运算符运行时执行。
我正在从气流变量中读取一个整数变量,然后每次 DAG 运行时将该值递增 1,然后再次将其设置为该变量。
但是在下面的代码之后,UI 处的变量每次刷新页面时都会发生变化。 不知道是什么导致了这种行为
counter = Variable.get('counter')
s = BashOperator(
task_id='echo_start_variable',
bash_command='echo ' + counter,
dag=dag,
)
Variable.set("counter", int(counter) + 1)
sql_query = "SELECT * FROM UNNEST(SEQUENCE({start}, {end}))"
sql_query = sql_query.replace('{start}', start).replace('{end}', end)
submit_query = PythonOperator(
task_id='submit_athena_query',
python_callable=run_athena_query,
op_kwargs={'query': sql_query, 'db': 'db',
's3_output': 's3://s3-path/rohan/date=' + current_date + '/'},
dag=dag)
e = BashOperator(
task_id='echo_end_variable',
bash_command='echo ' + counter,
dag=dag,
)
s >> submit_query >> e
气流处理 DAG 文件每 30 秒一次(min_file_process_interval
设置的默认值)这意味着您拥有的任何顶级代码每 30 秒 运行 所以 Variable.set("counter", int(counter) + 1)
将使变量计数器每 30 秒增加 1。
在顶层代码中与变量交互是一种不好的做法(不管增加价值的问题)。它每 30 秒打开一个到 Metastore 数据库的连接,这可能会导致严重的问题并使数据库不堪重负。
要获取 Variable 的值,您可以使用 Jinja:
e = BashOperator(
task_id='echo_end_variable',
bash_command='echo {{ var.value.counter }}',
dag=dag,
)
这是使用变量的安全方法,因为只有在执行运算符时才会检索值。
如果要将变量的值增加 1,请使用 PythonOpeartor
:
def increase():
counter = Variable.get('counter')
Variable.set("counter", int(counter) + 1)
increase_op = PythonOperator(
task_id='increase_task',
python_callable=increase,
dag=dag)
python 可调用函数仅在运算符运行时执行。