设置 python_callable 的 PythonOperator 不断执行

PythonOperator with python_callable set gets executed constantly

import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from workflow.task import some_task

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['jimin.park1@aig.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=1),
    'start_date': airflow.utils.dates.days_ago(0)
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('JiminTest', default_args=default_args, schedule_interval='*/1 * * * *', catchup=False)

t1 = PythonOperator(
    task_id='Task1',
    provide_context=True,
    python_callable=some_task,
    dag=dag
)

实际的 some_task 本身只是将时间戳附加到某个文件。正如您在 dag 配置文件中所见,任务本身配置为每 1 分钟 运行。

def some_task(ds, **kwargs):
    current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    with open("test.txt", "a") as myfile:
        myfile.write(current_time + '\n')

我简单地 tail -f 输出文件并在没有调度程序 运行ning 的情况下启动网络服务器。当网络服务器启动时,这个函数被调用并且东西被附加到文件中。当我启动调度程序时,在每个执行循环中,文件都会被附加。

我想要的是函数按预期每分钟执行一次,而不是每个执行循环。

调度程序将运行每个调度程序循环的每个 DAG 文件,包括所有导入语句。

导入函数的文件中是否有任何 运行ning 代码?

尝试检查配置文件中的 scheduler_heartbeat_sec 配置参数。对于您的情况,它应该小于 60 秒。

如果您希望调度程序不追赶之前的运行,请将 catchup_by_default 设置为 False(不过我不确定这是否与您的问题相关)。

请说明您使用的是哪个 Apache Airflow 版本