设置 python_callable 的 PythonOperator 不断执行
PythonOperator with python_callable set gets executed constantly
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from workflow.task import some_task
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['jimin.park1@aig.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1),
'start_date': airflow.utils.dates.days_ago(0)
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('JiminTest', default_args=default_args, schedule_interval='*/1 * * * *', catchup=False)
t1 = PythonOperator(
task_id='Task1',
provide_context=True,
python_callable=some_task,
dag=dag
)
实际的 some_task 本身只是将时间戳附加到某个文件。正如您在 dag 配置文件中所见,任务本身配置为每 1 分钟 运行。
def some_task(ds, **kwargs):
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open("test.txt", "a") as myfile:
myfile.write(current_time + '\n')
我简单地 tail -f 输出文件并在没有调度程序 运行ning 的情况下启动网络服务器。当网络服务器启动时,这个函数被调用并且东西被附加到文件中。当我启动调度程序时,在每个执行循环中,文件都会被附加。
我想要的是函数按预期每分钟执行一次,而不是每个执行循环。
调度程序将运行每个调度程序循环的每个 DAG 文件,包括所有导入语句。
导入函数的文件中是否有任何 运行ning 代码?
尝试检查配置文件中的 scheduler_heartbeat_sec
配置参数。对于您的情况,它应该小于 60 秒。
如果您希望调度程序不追赶之前的运行,请将 catchup_by_default
设置为 False(不过我不确定这是否与您的问题相关)。
请说明您使用的是哪个 Apache Airflow 版本
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from workflow.task import some_task
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['jimin.park1@aig.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1),
'start_date': airflow.utils.dates.days_ago(0)
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('JiminTest', default_args=default_args, schedule_interval='*/1 * * * *', catchup=False)
t1 = PythonOperator(
task_id='Task1',
provide_context=True,
python_callable=some_task,
dag=dag
)
实际的 some_task 本身只是将时间戳附加到某个文件。正如您在 dag 配置文件中所见,任务本身配置为每 1 分钟 运行。
def some_task(ds, **kwargs):
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open("test.txt", "a") as myfile:
myfile.write(current_time + '\n')
我简单地 tail -f 输出文件并在没有调度程序 运行ning 的情况下启动网络服务器。当网络服务器启动时,这个函数被调用并且东西被附加到文件中。当我启动调度程序时,在每个执行循环中,文件都会被附加。
我想要的是函数按预期每分钟执行一次,而不是每个执行循环。
调度程序将运行每个调度程序循环的每个 DAG 文件,包括所有导入语句。
导入函数的文件中是否有任何 运行ning 代码?
尝试检查配置文件中的 scheduler_heartbeat_sec
配置参数。对于您的情况,它应该小于 60 秒。
如果您希望调度程序不追赶之前的运行,请将 catchup_by_default
设置为 False(不过我不确定这是否与您的问题相关)。
请说明您使用的是哪个 Apache Airflow 版本