从 FTP 服务器获取最新的稳定文件列表

Get List of latest stable files from FTP server

我正在尝试获取已完全上传到 FTP 服务器的文件列表。 我可以访问此 FTP 服务器,其中第 3 方每 15 分钟写入一次数据和标记文件。一旦数据文件完全上传,就会创建一个标记文件。我们知道,一旦有了这个标记文件,就意味着数据文件已经准备就绪,我们可以下载它了。我正在寻找一种有效解决此问题的方法。我想每分钟检查 FTP 服务器上是否有任何新的稳定文件,如果有我会下载这些文件。一种首选方法是查看标记文件是否存在 2 分钟,然后我们可以下载标记文件和相应的数据文件。 我是 python 的新手,正在寻求帮助。 在列出文件之前,我有一些代码

import paramiko
from datetime import datetime, timedelta

FTP_HOST = 'host_address'
FTP_PORT = 21
FTP_USERNAME = 'username'
FTP_PASSWORD = 'password'
FTP_ROOT_PATH = 'path_to_dir'

def today():
    return datetime.strftime(datetime.now(), '%Y%m%d')

def open_ftp_connection(ftp_host, ftp_port, ftp_username, ftp_password):
    """
    Opens ftp connection and returns connection object
    """
    client = paramiko.SSHClient()
    client.load_system_host_keys()
    try:
        transport = paramiko.Transport(ftp_host, ftp_port)
    except Exception as e:
        return 'conn_error'
    try:
        transport.connect(username=ftp_username, password=ftp_password)
    except Exception as identifier:
        return 'auth_error'
    ftp_connection = paramiko.SFTPClient.from_transport(transport)
    return ftp_connection

def show_ftp_files_stat():
    ftp_connection = open_ftp_connection(FTP_HOST, int(FTP_PORT), FTP_USERNAME, FTP_PASSWORD)
    full_ftp_path = FTP_ROOT_PATH + "/" + today()
    file_attr_list = ftp_connection.listdir_attr(full_ftp_path)
    print(file_attr_list)
    for file_attr in file_attr_list:
        print(file_attr.filename, file_attr.st_size, file_attr.st_mtime)

if __name__ == '__main__':
    show_ftp_files_stat()

示例文件名 org-reference-delta-quotes.REF.48C2.20200402.92.1.1.txt.gz 样例对应标记文件名 org-reference-delta-quotes.REF.48C2.20200402.92.note.txt.gz

我用2分钟稳定规则解决了我的用例,如果修改时间在当前时间的2分钟以内,我认为它们是稳定的。

import logging
import time
from datetime import datetime, timezone
from ftplib import FTP


FTP_HOST = 'host_address'
FTP_PORT = 21
FTP_USERNAME = 'username'
FTP_PASSWORD = 'password'
FTP_ROOT_PATH = 'path_to_dir'


logger = logging.getLogger()
logger.setLevel(logging.ERROR)


def today():
    return datetime.strftime(datetime.now(tz=timezone.utc), '%Y%m%d')


def current_utc_ts():
    return datetime.utcnow().timestamp()


def current_utc_ts_minus_120():
    return int(datetime.utcnow().timestamp()) - 120


def yyyymmddhhmmss_string_epoch_ts(dt_string):
    return time.mktime(time.strptime(dt_string, '%Y%m%d%H%M%S'))


def get_ftp_connection(ftp_host, ftp_username, ftp_password):
    try:
        ftp = FTP(ftp_host, ftp_username, ftp_password)
    except Exception as e:
        print(e)
        logger.error(e)
        return 'conn_error'
    return ftp


def get_list_of_files(ftp_connection, date_to_process):
    full_ftp_path = FTP_ROOT_PATH + "/" + date_to_process + "/"
    ftp_connection.cwd(full_ftp_path)
    entries = list(ftp_connection.mlsd())
    entry_list = [line for line in entries if line[0].endswith('.gz') | line[0].endswith('.zip')]
    ftp_connection.quit()
    print('Total file count', len(entry_list))
    return entry_list


def parse_file_list_to_dict(entries):
    try:
        file_dict_list = []
        for line in entries:
            file_dict = dict({"file_name": line[0],
                              "server_timestamp": int(yyyymmddhhmmss_string_epoch_ts(line[1]['modify'])),
                              "server_date": line[0].split(".")[3])
            file_dict_list.append(file_dict)
    except IndexError as e:
        # Output expected IndexErrors.
        logging.exception(e)
    except Exception as exception:
        # Output unexpected Exceptions.
        logging.exception(exception, False)
    return file_dict_list


def get_stable_files_dict_list(dict_list):
    stable_list = list(filter(lambda d: d['server_timestamp'] < current_utc_ts_minus_120(), dict_list))
    print('stable file count: {}'.format(len(stable_list)))
    return stable_list


if __name__ == '__main__':
    ftp_connection = get_ftp_connection(FTP_HOST, FTP_USERNAME, FTP_PASSWORD)
    if ftp_connection == 'conn_error':
        logger.error('Failed to connect FTP Server!')
    else:
        file_list = get_list_of_files(ftp_connection, today())
        parse_file_list = parse_file_list_to_dict(file_list)
        stable_file_list = get_stable_files_dict_list(parse_file_list)