Airflow File Sensor 用于检测本地驱动器上的文件
Airflow File Sensor for sensing files on my local drive
有人知道 FileSensor 吗?我在研究本地目录中的传感文件时遇到了它。代码如下:
task= FileSensor(
task_id="senseFile"
filepath="etc/hosts",
fs_conn_id='fs_local',
_hook=self.hook,
dag=self.dag,)
我也将我的 conn_id 和连接类型设置为文件(路径)并给出了 {'path':'mypath'} 但即使我设置了一个不存在的路径或者如果该文件不在指定路径中,任务已完成并且 dag 成功。 FileSensor 似乎根本无法感知文件。
我发现社区贡献的 FileSenor 有点平庸,所以我自己写了一个。
我在 server/scheduler 是 运行 的本地文件上工作,但是 运行 在使用网络路径时遇到问题。
我找到的网络路径技巧是将网络驱动器安装到我的 Linux Box。
这是我用来sensor_task>>proccess_task>>archive_task>>触发重新运行
的DAG
注意:我们使用通过 WebGUI 输入的变量(sourcePath、filePattern 和 archivePath)
from airflow import DAG
from airflow.operators import PythonOperator, OmegaFileSensor, ArchiveFileOperator, TriggerDagRunOperator
from datetime import datetime, timedelta
from airflow.models import Variable
default_args = {
'owner': 'glsam',
'depends_on_past': False,
'start_date': datetime(2017, 6, 26),
'provide_context': True,
'retries': 100,
'retry_delay': timedelta(seconds=30)
}
task_name = 'my_first_file_sensor_task'
filepath = Variable.get("soucePath")
filepattern = Variable.get("filePattern")
archivepath = Variable.get("archivePath")
dag = DAG(
'task_name',
default_args=default_args,
schedule_interval=None,
catchup=False,
max_active_runs=1,
concurrency=1)
sensor_task = OmegaFileSensor(
task_id=task_name,
filepath=filepath,
filepattern=filepattern,
poke_interval=3,
dag=dag)
def process_file(**context):
file_to_process = context['task_instance'].xcom_pull(
key='file_name', task_ids=task_name)
file = open(filepath + file_to_process, 'w')
file.write('This is a test\n')
file.write('of processing the file')
file.close()
proccess_task = PythonOperator(
task_id='process_the_file', python_callable=process_file, dag=dag)
archive_task = ArchiveFileOperator(
task_id='archive_file',
filepath=filepath,
task_name=task_name,
archivepath=archivepath,
dag=dag)
trigger = TriggerDagRunOperator(
task_id='trigger_dag_rerun', trigger_dag_id=task_name, dag=dag)
sensor_task >> proccess_task >> archive_task >> trigger
然后这是我的 FileSenor
import os
import re
from datetime import datetime
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
from airflow.operators.sensors import BaseSensorOperator
class ArchiveFileOperator(BaseOperator):
@apply_defaults
def __init__(self, filepath, archivepath, task_name, *args, **kwargs):
super(ArchiveFileOperator, self).__init__(*args, **kwargs)
self.filepath = filepath
self.archivepath = archivepath
self.task_name = task_name
def execute(self, context):
file_name = context['task_instance'].xcom_pull(self.task_name, key='file_name')
os.rename(self.filepath + file_name, self.archivepath + file_name)
class OmegaFileSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, filepath, filepattern, *args, **kwargs):
super(OmegaFileSensor, self).__init__(*args, **kwargs)
self.filepath = filepath
self.filepattern = filepattern
def poke(self, context):
full_path = self.filepath
file_pattern = re.compile(self.filepattern)
directory = os.listdir(full_path)
for files in directory:
if not re.match(file_pattern, files):
# do nothing
else:
context['task_instance'].xcom_push('file_name', files)
return True
return False
class OmegaPlugin(AirflowPlugin):
name = "omega_plugin"
operators = [OmegaFileSensor, ArchiveFileOperator]
有人知道 FileSensor 吗?我在研究本地目录中的传感文件时遇到了它。代码如下:
task= FileSensor(
task_id="senseFile"
filepath="etc/hosts",
fs_conn_id='fs_local',
_hook=self.hook,
dag=self.dag,)
我也将我的 conn_id 和连接类型设置为文件(路径)并给出了 {'path':'mypath'} 但即使我设置了一个不存在的路径或者如果该文件不在指定路径中,任务已完成并且 dag 成功。 FileSensor 似乎根本无法感知文件。
我发现社区贡献的 FileSenor 有点平庸,所以我自己写了一个。
我在 server/scheduler 是 运行 的本地文件上工作,但是 运行 在使用网络路径时遇到问题。
我找到的网络路径技巧是将网络驱动器安装到我的 Linux Box。
这是我用来sensor_task>>proccess_task>>archive_task>>触发重新运行
的DAG注意:我们使用通过 WebGUI 输入的变量(sourcePath、filePattern 和 archivePath)
from airflow import DAG
from airflow.operators import PythonOperator, OmegaFileSensor, ArchiveFileOperator, TriggerDagRunOperator
from datetime import datetime, timedelta
from airflow.models import Variable
default_args = {
'owner': 'glsam',
'depends_on_past': False,
'start_date': datetime(2017, 6, 26),
'provide_context': True,
'retries': 100,
'retry_delay': timedelta(seconds=30)
}
task_name = 'my_first_file_sensor_task'
filepath = Variable.get("soucePath")
filepattern = Variable.get("filePattern")
archivepath = Variable.get("archivePath")
dag = DAG(
'task_name',
default_args=default_args,
schedule_interval=None,
catchup=False,
max_active_runs=1,
concurrency=1)
sensor_task = OmegaFileSensor(
task_id=task_name,
filepath=filepath,
filepattern=filepattern,
poke_interval=3,
dag=dag)
def process_file(**context):
file_to_process = context['task_instance'].xcom_pull(
key='file_name', task_ids=task_name)
file = open(filepath + file_to_process, 'w')
file.write('This is a test\n')
file.write('of processing the file')
file.close()
proccess_task = PythonOperator(
task_id='process_the_file', python_callable=process_file, dag=dag)
archive_task = ArchiveFileOperator(
task_id='archive_file',
filepath=filepath,
task_name=task_name,
archivepath=archivepath,
dag=dag)
trigger = TriggerDagRunOperator(
task_id='trigger_dag_rerun', trigger_dag_id=task_name, dag=dag)
sensor_task >> proccess_task >> archive_task >> trigger
然后这是我的 FileSenor
import os
import re
from datetime import datetime
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
from airflow.operators.sensors import BaseSensorOperator
class ArchiveFileOperator(BaseOperator):
@apply_defaults
def __init__(self, filepath, archivepath, task_name, *args, **kwargs):
super(ArchiveFileOperator, self).__init__(*args, **kwargs)
self.filepath = filepath
self.archivepath = archivepath
self.task_name = task_name
def execute(self, context):
file_name = context['task_instance'].xcom_pull(self.task_name, key='file_name')
os.rename(self.filepath + file_name, self.archivepath + file_name)
class OmegaFileSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, filepath, filepattern, *args, **kwargs):
super(OmegaFileSensor, self).__init__(*args, **kwargs)
self.filepath = filepath
self.filepattern = filepattern
def poke(self, context):
full_path = self.filepath
file_pattern = re.compile(self.filepattern)
directory = os.listdir(full_path)
for files in directory:
if not re.match(file_pattern, files):
# do nothing
else:
context['task_instance'].xcom_push('file_name', files)
return True
return False
class OmegaPlugin(AirflowPlugin):
name = "omega_plugin"
operators = [OmegaFileSensor, ArchiveFileOperator]