从 FTP 服务器获取最新的稳定文件列表
Get List of latest stable files from FTP server
我正在尝试获取已完全上传到 FTP 服务器的文件列表。
我可以访问此 FTP 服务器,其中第 3 方每 15 分钟写入一次数据和标记文件。一旦数据文件完全上传,就会创建一个标记文件。我们知道,一旦有了这个标记文件,就意味着数据文件已经准备就绪,我们可以下载它了。我正在寻找一种有效解决此问题的方法。我想每分钟检查 FTP 服务器上是否有任何新的稳定文件,如果有我会下载这些文件。一种首选方法是查看标记文件是否存在 2 分钟,然后我们可以下载标记文件和相应的数据文件。
我是 python 的新手,正在寻求帮助。
在列出文件之前,我有一些代码
import paramiko
from datetime import datetime, timedelta
FTP_HOST = 'host_address'
FTP_PORT = 21
FTP_USERNAME = 'username'
FTP_PASSWORD = 'password'
FTP_ROOT_PATH = 'path_to_dir'
def today():
return datetime.strftime(datetime.now(), '%Y%m%d')
def open_ftp_connection(ftp_host, ftp_port, ftp_username, ftp_password):
"""
Opens ftp connection and returns connection object
"""
client = paramiko.SSHClient()
client.load_system_host_keys()
try:
transport = paramiko.Transport(ftp_host, ftp_port)
except Exception as e:
return 'conn_error'
try:
transport.connect(username=ftp_username, password=ftp_password)
except Exception as identifier:
return 'auth_error'
ftp_connection = paramiko.SFTPClient.from_transport(transport)
return ftp_connection
def show_ftp_files_stat():
ftp_connection = open_ftp_connection(FTP_HOST, int(FTP_PORT), FTP_USERNAME, FTP_PASSWORD)
full_ftp_path = FTP_ROOT_PATH + "/" + today()
file_attr_list = ftp_connection.listdir_attr(full_ftp_path)
print(file_attr_list)
for file_attr in file_attr_list:
print(file_attr.filename, file_attr.st_size, file_attr.st_mtime)
if __name__ == '__main__':
show_ftp_files_stat()
示例文件名
org-reference-delta-quotes.REF.48C2.20200402.92.1.1.txt.gz
样例对应标记文件名
org-reference-delta-quotes.REF.48C2.20200402.92.note.txt.gz
我用2分钟稳定规则解决了我的用例,如果修改时间在当前时间的2分钟以内,我认为它们是稳定的。
import logging
import time
from datetime import datetime, timezone
from ftplib import FTP
FTP_HOST = 'host_address'
FTP_PORT = 21
FTP_USERNAME = 'username'
FTP_PASSWORD = 'password'
FTP_ROOT_PATH = 'path_to_dir'
logger = logging.getLogger()
logger.setLevel(logging.ERROR)
def today():
return datetime.strftime(datetime.now(tz=timezone.utc), '%Y%m%d')
def current_utc_ts():
return datetime.utcnow().timestamp()
def current_utc_ts_minus_120():
return int(datetime.utcnow().timestamp()) - 120
def yyyymmddhhmmss_string_epoch_ts(dt_string):
return time.mktime(time.strptime(dt_string, '%Y%m%d%H%M%S'))
def get_ftp_connection(ftp_host, ftp_username, ftp_password):
try:
ftp = FTP(ftp_host, ftp_username, ftp_password)
except Exception as e:
print(e)
logger.error(e)
return 'conn_error'
return ftp
def get_list_of_files(ftp_connection, date_to_process):
full_ftp_path = FTP_ROOT_PATH + "/" + date_to_process + "/"
ftp_connection.cwd(full_ftp_path)
entries = list(ftp_connection.mlsd())
entry_list = [line for line in entries if line[0].endswith('.gz') | line[0].endswith('.zip')]
ftp_connection.quit()
print('Total file count', len(entry_list))
return entry_list
def parse_file_list_to_dict(entries):
try:
file_dict_list = []
for line in entries:
file_dict = dict({"file_name": line[0],
"server_timestamp": int(yyyymmddhhmmss_string_epoch_ts(line[1]['modify'])),
"server_date": line[0].split(".")[3])
file_dict_list.append(file_dict)
except IndexError as e:
# Output expected IndexErrors.
logging.exception(e)
except Exception as exception:
# Output unexpected Exceptions.
logging.exception(exception, False)
return file_dict_list
def get_stable_files_dict_list(dict_list):
stable_list = list(filter(lambda d: d['server_timestamp'] < current_utc_ts_minus_120(), dict_list))
print('stable file count: {}'.format(len(stable_list)))
return stable_list
if __name__ == '__main__':
ftp_connection = get_ftp_connection(FTP_HOST, FTP_USERNAME, FTP_PASSWORD)
if ftp_connection == 'conn_error':
logger.error('Failed to connect FTP Server!')
else:
file_list = get_list_of_files(ftp_connection, today())
parse_file_list = parse_file_list_to_dict(file_list)
stable_file_list = get_stable_files_dict_list(parse_file_list)
我正在尝试获取已完全上传到 FTP 服务器的文件列表。 我可以访问此 FTP 服务器,其中第 3 方每 15 分钟写入一次数据和标记文件。一旦数据文件完全上传,就会创建一个标记文件。我们知道,一旦有了这个标记文件,就意味着数据文件已经准备就绪,我们可以下载它了。我正在寻找一种有效解决此问题的方法。我想每分钟检查 FTP 服务器上是否有任何新的稳定文件,如果有我会下载这些文件。一种首选方法是查看标记文件是否存在 2 分钟,然后我们可以下载标记文件和相应的数据文件。 我是 python 的新手,正在寻求帮助。 在列出文件之前,我有一些代码
import paramiko
from datetime import datetime, timedelta
FTP_HOST = 'host_address'
FTP_PORT = 21
FTP_USERNAME = 'username'
FTP_PASSWORD = 'password'
FTP_ROOT_PATH = 'path_to_dir'
def today():
return datetime.strftime(datetime.now(), '%Y%m%d')
def open_ftp_connection(ftp_host, ftp_port, ftp_username, ftp_password):
"""
Opens ftp connection and returns connection object
"""
client = paramiko.SSHClient()
client.load_system_host_keys()
try:
transport = paramiko.Transport(ftp_host, ftp_port)
except Exception as e:
return 'conn_error'
try:
transport.connect(username=ftp_username, password=ftp_password)
except Exception as identifier:
return 'auth_error'
ftp_connection = paramiko.SFTPClient.from_transport(transport)
return ftp_connection
def show_ftp_files_stat():
ftp_connection = open_ftp_connection(FTP_HOST, int(FTP_PORT), FTP_USERNAME, FTP_PASSWORD)
full_ftp_path = FTP_ROOT_PATH + "/" + today()
file_attr_list = ftp_connection.listdir_attr(full_ftp_path)
print(file_attr_list)
for file_attr in file_attr_list:
print(file_attr.filename, file_attr.st_size, file_attr.st_mtime)
if __name__ == '__main__':
show_ftp_files_stat()
示例文件名
org-reference-delta-quotes.REF.48C2.20200402.92.1.1.txt.gz
样例对应标记文件名
org-reference-delta-quotes.REF.48C2.20200402.92.note.txt.gz
我用2分钟稳定规则解决了我的用例,如果修改时间在当前时间的2分钟以内,我认为它们是稳定的。
import logging
import time
from datetime import datetime, timezone
from ftplib import FTP
FTP_HOST = 'host_address'
FTP_PORT = 21
FTP_USERNAME = 'username'
FTP_PASSWORD = 'password'
FTP_ROOT_PATH = 'path_to_dir'
logger = logging.getLogger()
logger.setLevel(logging.ERROR)
def today():
return datetime.strftime(datetime.now(tz=timezone.utc), '%Y%m%d')
def current_utc_ts():
return datetime.utcnow().timestamp()
def current_utc_ts_minus_120():
return int(datetime.utcnow().timestamp()) - 120
def yyyymmddhhmmss_string_epoch_ts(dt_string):
return time.mktime(time.strptime(dt_string, '%Y%m%d%H%M%S'))
def get_ftp_connection(ftp_host, ftp_username, ftp_password):
try:
ftp = FTP(ftp_host, ftp_username, ftp_password)
except Exception as e:
print(e)
logger.error(e)
return 'conn_error'
return ftp
def get_list_of_files(ftp_connection, date_to_process):
full_ftp_path = FTP_ROOT_PATH + "/" + date_to_process + "/"
ftp_connection.cwd(full_ftp_path)
entries = list(ftp_connection.mlsd())
entry_list = [line for line in entries if line[0].endswith('.gz') | line[0].endswith('.zip')]
ftp_connection.quit()
print('Total file count', len(entry_list))
return entry_list
def parse_file_list_to_dict(entries):
try:
file_dict_list = []
for line in entries:
file_dict = dict({"file_name": line[0],
"server_timestamp": int(yyyymmddhhmmss_string_epoch_ts(line[1]['modify'])),
"server_date": line[0].split(".")[3])
file_dict_list.append(file_dict)
except IndexError as e:
# Output expected IndexErrors.
logging.exception(e)
except Exception as exception:
# Output unexpected Exceptions.
logging.exception(exception, False)
return file_dict_list
def get_stable_files_dict_list(dict_list):
stable_list = list(filter(lambda d: d['server_timestamp'] < current_utc_ts_minus_120(), dict_list))
print('stable file count: {}'.format(len(stable_list)))
return stable_list
if __name__ == '__main__':
ftp_connection = get_ftp_connection(FTP_HOST, FTP_USERNAME, FTP_PASSWORD)
if ftp_connection == 'conn_error':
logger.error('Failed to connect FTP Server!')
else:
file_list = get_list_of_files(ftp_connection, today())
parse_file_list = parse_file_list_to_dict(file_list)
stable_file_list = get_stable_files_dict_list(parse_file_list)