气流远程文件传感器
Airflow Remote file sensor
我正在尝试查找远程服务器中是否有任何文件与提供的模式匹配。类似于以下解决方案
我将 SSHOperator 与 bash 命令一起使用,如下所示,
SSH_Bash = """
echo 'poking for files...'
ls /home/files/test.txt
if [ $? -eq "0" ]; then
echo 'Found file'
else
echo 'failed to find'
fi
"""
t1 = SSHOperator(
ssh_conn_id='ssh_default',
task_id='test_ssh_operator',
command=SSH_Bash,
dag=dag)
它有效,但看起来不像是最佳解决方案。有人可以帮助我获得比 Bash 脚本更好的解决方案来检测远程服务器中的文件。
我尝试了下面的 sftp 传感器,
import os
import re
import logging
from paramiko import SFTP_NO_SUCH_FILE
from airflow.contrib.hooks.sftp_hook import SFTPHook
from airflow.operators.sensors import BaseSensorOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
class SFTPSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, filepath,filepattern, sftp_conn_id='sftp_default', *args, **kwargs):
super(SFTPSensor, self).__init__(*args, **kwargs)
self.filepath = filepath
self.filepattern = filepattern
self.hook = SFTPHook(sftp_conn_id)
def poke(self, context):
full_path = self.filepath
file_pattern = re.compile(self.filepattern)
try:
directory = os.listdir(self.hook.full_path)
for files in directory:
if not re.match(file_pattern, files):
self.log.info(files)
self.log.info(file_pattern)
else:
context["task_instance"].xcom_push("file_name", files)
return True
return False
except IOError as e:
if e.errno != SFTP_NO_SUCH_FILE:
raise e
return False
class SFTPSensorPlugin(AirflowPlugin):
name = "sftp_sensor"
sensors = [SFTPSensor]
但这总是戳到本地机器而不是远程机器。有人可以帮助我在哪里我犯了错误。
我替换了
中的行
directory = os.listdir(self.hook.full_path)
到
directory = self.hook.list_directory(full_path)
我正在尝试查找远程服务器中是否有任何文件与提供的模式匹配。类似于以下解决方案
我将 SSHOperator 与 bash 命令一起使用,如下所示,
SSH_Bash = """
echo 'poking for files...'
ls /home/files/test.txt
if [ $? -eq "0" ]; then
echo 'Found file'
else
echo 'failed to find'
fi
"""
t1 = SSHOperator(
ssh_conn_id='ssh_default',
task_id='test_ssh_operator',
command=SSH_Bash,
dag=dag)
它有效,但看起来不像是最佳解决方案。有人可以帮助我获得比 Bash 脚本更好的解决方案来检测远程服务器中的文件。
我尝试了下面的 sftp 传感器,
import os
import re
import logging
from paramiko import SFTP_NO_SUCH_FILE
from airflow.contrib.hooks.sftp_hook import SFTPHook
from airflow.operators.sensors import BaseSensorOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
class SFTPSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, filepath,filepattern, sftp_conn_id='sftp_default', *args, **kwargs):
super(SFTPSensor, self).__init__(*args, **kwargs)
self.filepath = filepath
self.filepattern = filepattern
self.hook = SFTPHook(sftp_conn_id)
def poke(self, context):
full_path = self.filepath
file_pattern = re.compile(self.filepattern)
try:
directory = os.listdir(self.hook.full_path)
for files in directory:
if not re.match(file_pattern, files):
self.log.info(files)
self.log.info(file_pattern)
else:
context["task_instance"].xcom_push("file_name", files)
return True
return False
except IOError as e:
if e.errno != SFTP_NO_SUCH_FILE:
raise e
return False
class SFTPSensorPlugin(AirflowPlugin):
name = "sftp_sensor"
sensors = [SFTPSensor]
但这总是戳到本地机器而不是远程机器。有人可以帮助我在哪里我犯了错误。
我替换了
中的行directory = os.listdir(self.hook.full_path)
到
directory = self.hook.list_directory(full_path)