Airflow 文件传感器任务不工作,一直处于队列状态
Airflow FileSensor taks is not working, alwasy in queue status
我正在尝试使用气流 DAG 感应文件,但我的 FileSensor 始终停留在队列状态。我试过下面的代码示例。有什么我想念的吗?顺便说一句,我的气流版本是 2.0.1.
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, date, timedelta
import airflow
default_args = {
"depends_on_past" : False,
"start_date" : datetime.now()-timedelta(minutes=10)
}
with airflow.DAG( "fs_test_dag", default_args= default_args, schedule_interval= "@once" ) as dag:
start_task = DummyOperator( task_id= "start" )
stop_task = DummyOperator( task_id= "stop" )
sensor_task = FileSensor( task_id= "file_sensor_task", poke_interval= 30, filepath= "testfile.csv" )
start_task >> sensor_task >> stop_task
我无法重现该问题。 FileSensor
正在等待操作员使用的连接中指定的目录中的文件。默认情况下它是 ~/
目录。您确定该文件是在您期望的位置创建的吗?目前你的传感器正在等待 /testfile.csv
是预期的吗?
我创建了一个虚拟文件 运行 你 DAG 并且它 运行 好的:
root@64299244fb44:/opt/airflow# touch /testfile.csv
root@64299244fb44:/opt/airflow# airflow dags backfill -s 2021-05-25 fs_test_dag
/opt/airflow/airflow/cli/commands/dag_command.py:62 PendingDeprecationWarning: --ignore-first-depends-on-past is deprecated as the value is always set to True
[2021-05-31 20:48:15,940] {dagbag.py:487} INFO - Filling up the DagBag from /files/dags
[2021-05-31 20:48:16,736] {base_executor.py:82} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'fs_test_dag', 'start', '2021-05-25T00:00:00+00:00', '--ignore-depends-on-past', '--local', '--pool', 'default_pool', '--subdir', '/files/dags/tasks.py', '--cfg-path', '/tmp/tmp8ttb6fz5']
[2021-05-31 20:48:21,476] {local_executor.py:81} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'fs_test_dag', 'start', '2021-05-25T00:00:00+00:00', '--ignore-depends-on-past', '--local', '--pool', 'default_pool', '--subdir', '/files/dags/tasks.py', '--cfg-path', '/tmp/tmp8ttb6fz5']
[2021-05-31 20:48:21,488] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 2 | succeeded: 0 | running: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 2
[2021-05-31 20:48:21,531] {dagbag.py:487} INFO - Filling up the DagBag from /files/dags/tasks.py
Running <TaskInstance: fs_test_dag.start 2021-05-25T00:00:00+00:00 [queued]> on host 64299244fb44
[2021-05-31 20:48:26,503] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 2 | succeeded: 1 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 2
[2021-05-31 20:48:26,527] {base_executor.py:82} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'fs_test_dag', 'file_sensor_task', '2021-05-25T00:00:00+00:00', '--ignore-depends-on-past', '--local', '--pool', 'default_pool', '--subdir', '/files/dags/tasks.py', '--cfg-path', '/tmp/tmpbslxmgxo']
[2021-05-31 20:48:31,503] {local_executor.py:81} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'fs_test_dag', 'file_sensor_task', '2021-05-25T00:00:00+00:00', '--ignore-depends-on-past', '--local', '--pool', 'default_pool', '--subdir', '/files/dags/tasks.py', '--cfg-path', '/tmp/tmpbslxmgxo']
[2021-05-31 20:48:31,516] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 1 | running: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
[2021-05-31 20:48:31,547] {dagbag.py:487} INFO - Filling up the DagBag from /files/dags/tasks.py
Running <TaskInstance: fs_test_dag.file_sensor_task 2021-05-25T00:00:00+00:00 [queued]> on host 64299244fb44
[2021-05-31 20:48:36,501] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 2 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
[2021-05-31 20:48:36,527] {base_executor.py:82} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'fs_test_dag', 'stop', '2021-05-25T00:00:00+00:00', '--ignore-depends-on-past', '--local', '--pool', 'default_pool', '--subdir', '/files/dags/tasks.py', '--cfg-path', '/tmp/tmpvoqqs24l']
[2021-05-31 20:48:41,498] {local_executor.py:81} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'fs_test_dag', 'stop', '2021-05-25T00:00:00+00:00', '--ignore-depends-on-past', '--local', '--pool', 'default_pool', '--subdir', '/files/dags/tasks.py', '--cfg-path', '/tmp/tmpvoqqs24l']
[2021-05-31 20:48:41,512] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 0 | succeeded: 2 | running: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2021-05-31 20:48:41,542] {dagbag.py:487} INFO - Filling up the DagBag from /files/dags/tasks.py
Running <TaskInstance: fs_test_dag.stop 2021-05-25T00:00:00+00:00 [queued]> on host 64299244fb44
[2021-05-31 20:48:46,520] {dagrun.py:444} INFO - Marking run <DagRun fs_test_dag @ 2021-05-25T00:00:00+00:00: backfill__2021-05-25T00:00:00+00:00, externally triggered: False> successful
[2021-05-31 20:48:46,525] {backfill_job.py:388} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 3 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2021-05-31 20:48:46,528] {local_executor.py:387} INFO - Shutting down LocalExecutor; waiting for running tasks to finish. Signal again if you don't want to wait.
[2021-05-31 20:48:46,663] {backfill_job.py:831} INFO - Backfill done. Exiting.
能否分享与您要运行执行此任务的工作人员关联的队列名称。我没有看到任务参数中传递的任何队列名称,因此气流会期望在 airflow.cfg
.
中使用相同名称 default_queue
定义的队列
我正在尝试使用气流 DAG 感应文件,但我的 FileSensor 始终停留在队列状态。我试过下面的代码示例。有什么我想念的吗?顺便说一句,我的气流版本是 2.0.1.
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, date, timedelta
import airflow
default_args = {
"depends_on_past" : False,
"start_date" : datetime.now()-timedelta(minutes=10)
}
with airflow.DAG( "fs_test_dag", default_args= default_args, schedule_interval= "@once" ) as dag:
start_task = DummyOperator( task_id= "start" )
stop_task = DummyOperator( task_id= "stop" )
sensor_task = FileSensor( task_id= "file_sensor_task", poke_interval= 30, filepath= "testfile.csv" )
start_task >> sensor_task >> stop_task
我无法重现该问题。 FileSensor
正在等待操作员使用的连接中指定的目录中的文件。默认情况下它是 ~/
目录。您确定该文件是在您期望的位置创建的吗?目前你的传感器正在等待 /testfile.csv
是预期的吗?
我创建了一个虚拟文件 运行 你 DAG 并且它 运行 好的:
root@64299244fb44:/opt/airflow# touch /testfile.csv
root@64299244fb44:/opt/airflow# airflow dags backfill -s 2021-05-25 fs_test_dag
/opt/airflow/airflow/cli/commands/dag_command.py:62 PendingDeprecationWarning: --ignore-first-depends-on-past is deprecated as the value is always set to True
[2021-05-31 20:48:15,940] {dagbag.py:487} INFO - Filling up the DagBag from /files/dags
[2021-05-31 20:48:16,736] {base_executor.py:82} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'fs_test_dag', 'start', '2021-05-25T00:00:00+00:00', '--ignore-depends-on-past', '--local', '--pool', 'default_pool', '--subdir', '/files/dags/tasks.py', '--cfg-path', '/tmp/tmp8ttb6fz5']
[2021-05-31 20:48:21,476] {local_executor.py:81} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'fs_test_dag', 'start', '2021-05-25T00:00:00+00:00', '--ignore-depends-on-past', '--local', '--pool', 'default_pool', '--subdir', '/files/dags/tasks.py', '--cfg-path', '/tmp/tmp8ttb6fz5']
[2021-05-31 20:48:21,488] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 2 | succeeded: 0 | running: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 2
[2021-05-31 20:48:21,531] {dagbag.py:487} INFO - Filling up the DagBag from /files/dags/tasks.py
Running <TaskInstance: fs_test_dag.start 2021-05-25T00:00:00+00:00 [queued]> on host 64299244fb44
[2021-05-31 20:48:26,503] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 2 | succeeded: 1 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 2
[2021-05-31 20:48:26,527] {base_executor.py:82} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'fs_test_dag', 'file_sensor_task', '2021-05-25T00:00:00+00:00', '--ignore-depends-on-past', '--local', '--pool', 'default_pool', '--subdir', '/files/dags/tasks.py', '--cfg-path', '/tmp/tmpbslxmgxo']
[2021-05-31 20:48:31,503] {local_executor.py:81} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'fs_test_dag', 'file_sensor_task', '2021-05-25T00:00:00+00:00', '--ignore-depends-on-past', '--local', '--pool', 'default_pool', '--subdir', '/files/dags/tasks.py', '--cfg-path', '/tmp/tmpbslxmgxo']
[2021-05-31 20:48:31,516] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 1 | running: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
[2021-05-31 20:48:31,547] {dagbag.py:487} INFO - Filling up the DagBag from /files/dags/tasks.py
Running <TaskInstance: fs_test_dag.file_sensor_task 2021-05-25T00:00:00+00:00 [queued]> on host 64299244fb44
[2021-05-31 20:48:36,501] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 2 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
[2021-05-31 20:48:36,527] {base_executor.py:82} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'fs_test_dag', 'stop', '2021-05-25T00:00:00+00:00', '--ignore-depends-on-past', '--local', '--pool', 'default_pool', '--subdir', '/files/dags/tasks.py', '--cfg-path', '/tmp/tmpvoqqs24l']
[2021-05-31 20:48:41,498] {local_executor.py:81} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'fs_test_dag', 'stop', '2021-05-25T00:00:00+00:00', '--ignore-depends-on-past', '--local', '--pool', 'default_pool', '--subdir', '/files/dags/tasks.py', '--cfg-path', '/tmp/tmpvoqqs24l']
[2021-05-31 20:48:41,512] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 0 | succeeded: 2 | running: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2021-05-31 20:48:41,542] {dagbag.py:487} INFO - Filling up the DagBag from /files/dags/tasks.py
Running <TaskInstance: fs_test_dag.stop 2021-05-25T00:00:00+00:00 [queued]> on host 64299244fb44
[2021-05-31 20:48:46,520] {dagrun.py:444} INFO - Marking run <DagRun fs_test_dag @ 2021-05-25T00:00:00+00:00: backfill__2021-05-25T00:00:00+00:00, externally triggered: False> successful
[2021-05-31 20:48:46,525] {backfill_job.py:388} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 3 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2021-05-31 20:48:46,528] {local_executor.py:387} INFO - Shutting down LocalExecutor; waiting for running tasks to finish. Signal again if you don't want to wait.
[2021-05-31 20:48:46,663] {backfill_job.py:831} INFO - Backfill done. Exiting.
能否分享与您要运行执行此任务的工作人员关联的队列名称。我没有看到任务参数中传递的任何队列名称,因此气流会期望在 airflow.cfg
.
default_queue
定义的队列