有没有办法使用 Spark 使用 TLS 在 FTP 中加载文件
Is there a way to use Spark to load a file in FTP using TLS
我正在将 python 进程移动到 Spark。在 python 中,我们使用 ftplib 连接并将文件下载到 EC2 实例。下载文件后,我们将上传到 S3。我们正在过渡到无服务器基础设施,并希望通过 AWS Glue 在 spark 中加载文件,然后使用多部分上传将其移动到 S3。我已尝试将 运行 当前代码放入更大的胶水实例类型中,但机器仍然 运行 内存不足(20GB 文件)。
旧python代码
"""
This script will get the backup file
"""
import sys
from datetime import datetime
import re
import ftplib
from retry import retry
import shutil
from tools.python.s3_functions import s3_upload
from python_scripts.get import *
def get_ftp_connector(path, user, password):
ftp = ftplib.FTP_TLS(path)
ftp.login(user, password)
ftp.prot_p()
return ftp
def get_ftp_files_list(ftp, dir):
ftp.cwd(dir)
files = ftp.nlst()
print(str("-".join(files)))
if "filecompleted.txt" not in files:
print("Failed to find filescompleted.txt file in ftp server.")
raise Exception("Failed to find filescompleted.txt file in ftp server.")
regex_str = 'Backup_File_Mask_Goes_here([\d]{8}).bak'
find_date_regex = re.compile(regex_str)
searched = [(f, find_date_regex.match(f)) for f in files if find_date_regex.match(f)]
searched = \
[(file_name, datetime.strptime(regex_result.groups()[0], '%Y%m%d')) for file_name, regex_result in searched]
searched = sorted(searched, key=lambda elem: elem[1], reverse=True)
if not searched:
print("Failed to find appropriate file in ftp server.")
raise Exception("Failed to find appropriate file in ftp server.")
return searched[0]
class FtpUploadTracker:
size_written = 0
total_size = 0
last_shown_percent = "X"
def __init__(self, total_size, bk_file):
self.total_size = total_size
self.bk_file = bk_file
self.output_file = open(self.bk_file, 'wb')
self.start_time = datetime.now()
def handle(self, block):
self.size_written += len(block)
percent_complete = str(round((self.size_written / self.total_size) * 100, 1))
self.output_file.write(block)
time_elapsed = (datetime.now() - self.start_time).total_seconds()
speed = round(self.size_written / (1000 * 1000 * time_elapsed), 2)
msg = "{percent}% complete @ average speed of {speed}MB/s : total run time {minutes}m".\
format(percent=percent_complete, speed=speed, minutes=round(time_elapsed/60))
if time_elapsed > 600 and speed < 1:
print("Zombie connection, failing dl.")
raise Exception("Zombie connection, failing dl.")
if self.last_shown_percent != percent_complete:
self.last_shown_percent = percent_complete
print(msg)
def close(self):
self.output_file.close()
@retry(tries=4, delay=300)
def retrieve_db():
"""
This function will retrieve via FTP the backup
:return: None
"""
ftp = get_ftp_connector(FTP_PATH, FTP_USER, FTP_PASSWORD)
# return back the most recent entry
file_name, file_date = get_ftp_files_list(ftp, 'database')
file_epoch = (file_date - datetime(1970, 1, 1)).total_seconds()
new_file_name = "backup_{epoch}.bak".format(epoch=str(int(file_epoch)))
if os.path.exists(DATAFILEPATH):
shutil.rmtree(DATAFILEPATH)
if not os.path.exists(DATAFILEPATH):
os.makedirs(DATAFILEPATH)
temp_backup_file_location = os.path.join(DATAFILEPATH + new_file_name)
print("Found file {file_name}, and downloading it to {loc}".
format(file_name=file_name, loc=temp_backup_file_location))
ftp_handler = FtpUploadTracker(ftp.size(file_name), temp_backup_file_location)
ftp.retrbinary("RETR " + file_name, ftp_handler.handle)
ftp.quit()
ftp_handler.close()
print("Finished download. Uploading to S3.")
s3_upload(DATAFILEPATH, new_file_name, bucket, "db_backup")
os.remove(temp_backup_file_location)
def main():
try:
retrieve_db()
except Exception as e:
print("Failed to download backup after 4 tries with error {e}.".format(e=e))
return 1
return 0
if __name__ == "__main__":
rtn = main()
sys.exit(rtn)
新的 Spark 代码(进行中):用户名有一个 |使我对 uri 进行编码的字符。当我 运行 代码时,连接被拒绝。我可以为 python.
使用相同的连接信息
from pyspark import SparkContext
from pyspark import SparkFiles
import urllib
sc = SparkContext()
ftp_path = "ftp://Username:password@ftplocation.com/path_to_file"
file_path_clean = urllib.parse.urlencode(ftp_path, safe='|')
print(f"file_path_clean: {file_path_clean}")
sc.addFile(ftp_path)
filename = SparkFiles.get(file_path.split('/')[-1])
print(f"filename: {filename}")
rdd = sc.textFile("file://" + filename)
print("We got past rdd = sc.textFile(file:// + filename)")
rdd.take(10)
rdd.collect()
print(rdd)
可通过三种方法解决该问题:
使用由 FTP 支持的挂载文件系统并从 Spark 写入。
使用 Spark 到 SFTP 连接器,例如 spark-sftp。
使用 Spark 将文件写入其他地方,然后作为单独的步骤复制到 SFTP。由于 SFTP 的各种可靠性问题以及 Spark 在失败的写入操作期间留下部分输出的事实,这是我们采用的路径。我们在 Scala 中使用类似于以下代码的代码将 TB 写入 SFTP 端点。希望能对你Python工作有所帮助
/** Defines some high-level operations for interacting with remote file protocols like FTP, SFTP, etc.
*/
trait RemoteFileOperations extends Closeable {
var backoff: BlockingRetry.Backoff = Backoff.linear(3000)
var retry: BlockingRetry.Retry = Retry.maxRetries(3)
var recover: Recovery = recoverable(this)
var ignore: Ignored = nonRecoverable
def listFiles(path: String = ""): Seq[FInfo]
def uploadFile(localPath: String, remoteDirectory: String): Unit
def downloadFile(localPath: String, remotePath: String): Unit
def deleteAll(path: String): Unit
def connect(): Unit = {}
def disconnect(): Unit = {}
def reconnect(): Unit = {
disconnect()
connect()
}
override def close(): Unit = disconnect()
/** Wraps a block of code and allows it to be retried when [[recoverable()]] conditions
* are met. [[BlockingRetry.retry()]] is called with the var fields
* [[backoff]], [[retry]], [[recover]], and [[ignore]], which can all be reconfigured.
*/
def retryable[A](f: => A): A = {
BlockingRetry.retry(retry, backoff, recover, ignore) {
f
}
}
def recoverable(fileOp: RemoteFileOperations): Recovery = {
case (_: SocketTimeoutException, _: Int) =>
fileOp.reconnect()
None
}
def nonRecoverable: Ignored = {
case _: UnknownHostException |
_: SSLException |
_: SocketException |
_: IllegalStateException =>
}
}
class SSHJClient(host: String, username: String, password: String) extends RemoteFileOperations {
import net.schmizz.keepalive.KeepAliveProvider
import net.schmizz.sshj.connection.ConnectionException
import net.schmizz.sshj.sftp.SFTPClient
import net.schmizz.sshj.transport.verification.PromiscuousVerifier
import net.schmizz.sshj.xfer.FileSystemFile
import net.schmizz.sshj.{DefaultConfig, SSHClient}
override def listFiles(path: String): Seq[FInfo] = {
import collection.JavaConverters._
retryable {
sftpSession(sftp => {
sftp.ls(path).asScala
.filter(f => f.getName != "." && f.getName != "..")
.map(f => FInfo(f.getPath, f.getParent, f.isDirectory, f.getAttributes.getSize, f.getAttributes.getMtime))
})
}
}
override def uploadFile(localPath: String, remoteDirectory: String): Unit = {
retryable {
sftpSession(sftp => {
sftp.getFileTransfer.setPreserveAttributes(false)
sftp.put(new FileSystemFile(localPath), remoteDirectory)
})
}
}
override def downloadFile(localPath: String, remotePath: String): Unit = {
retryable {
sftpSession(sftp => {
sftp.getFileTransfer.setPreserveAttributes(false)
sftp.get(remotePath, new FileSystemFile(localPath))
})
}
}
override def deleteAll(path: String): Unit =
throw new UnsupportedOperationException("#deleteAll is unsupported for SSHJClient")
private def sftpSession[A](f: SFTPClient => A): A = {
val defaultConfig = new DefaultConfig()
defaultConfig.setKeepAliveProvider(KeepAliveProvider.KEEP_ALIVE)
val ssh = new SSHClient(defaultConfig)
try {
// This is equivalent to StrictHostKeyChecking=no which is disabled since we don't usually know
// the SSH remote host key ahead of time.
ssh.addHostKeyVerifier(new PromiscuousVerifier())
ssh.connect(host)
ssh.authPassword(username, password)
val sftp = ssh.newSFTPClient()
try {
f(sftp)
} finally {
sftp.close()
}
} finally {
ssh.disconnect()
}
}
override def recoverable(fileOp: RemoteFileOperations): Recovery = {
super.recoverable(fileOp).orElse {
case (e: ConnectionException, _: Int) =>
println(s"Recovering session from exception: $e")
None
}
}
}
我正在将 python 进程移动到 Spark。在 python 中,我们使用 ftplib 连接并将文件下载到 EC2 实例。下载文件后,我们将上传到 S3。我们正在过渡到无服务器基础设施,并希望通过 AWS Glue 在 spark 中加载文件,然后使用多部分上传将其移动到 S3。我已尝试将 运行 当前代码放入更大的胶水实例类型中,但机器仍然 运行 内存不足(20GB 文件)。
旧python代码
"""
This script will get the backup file
"""
import sys
from datetime import datetime
import re
import ftplib
from retry import retry
import shutil
from tools.python.s3_functions import s3_upload
from python_scripts.get import *
def get_ftp_connector(path, user, password):
ftp = ftplib.FTP_TLS(path)
ftp.login(user, password)
ftp.prot_p()
return ftp
def get_ftp_files_list(ftp, dir):
ftp.cwd(dir)
files = ftp.nlst()
print(str("-".join(files)))
if "filecompleted.txt" not in files:
print("Failed to find filescompleted.txt file in ftp server.")
raise Exception("Failed to find filescompleted.txt file in ftp server.")
regex_str = 'Backup_File_Mask_Goes_here([\d]{8}).bak'
find_date_regex = re.compile(regex_str)
searched = [(f, find_date_regex.match(f)) for f in files if find_date_regex.match(f)]
searched = \
[(file_name, datetime.strptime(regex_result.groups()[0], '%Y%m%d')) for file_name, regex_result in searched]
searched = sorted(searched, key=lambda elem: elem[1], reverse=True)
if not searched:
print("Failed to find appropriate file in ftp server.")
raise Exception("Failed to find appropriate file in ftp server.")
return searched[0]
class FtpUploadTracker:
size_written = 0
total_size = 0
last_shown_percent = "X"
def __init__(self, total_size, bk_file):
self.total_size = total_size
self.bk_file = bk_file
self.output_file = open(self.bk_file, 'wb')
self.start_time = datetime.now()
def handle(self, block):
self.size_written += len(block)
percent_complete = str(round((self.size_written / self.total_size) * 100, 1))
self.output_file.write(block)
time_elapsed = (datetime.now() - self.start_time).total_seconds()
speed = round(self.size_written / (1000 * 1000 * time_elapsed), 2)
msg = "{percent}% complete @ average speed of {speed}MB/s : total run time {minutes}m".\
format(percent=percent_complete, speed=speed, minutes=round(time_elapsed/60))
if time_elapsed > 600 and speed < 1:
print("Zombie connection, failing dl.")
raise Exception("Zombie connection, failing dl.")
if self.last_shown_percent != percent_complete:
self.last_shown_percent = percent_complete
print(msg)
def close(self):
self.output_file.close()
@retry(tries=4, delay=300)
def retrieve_db():
"""
This function will retrieve via FTP the backup
:return: None
"""
ftp = get_ftp_connector(FTP_PATH, FTP_USER, FTP_PASSWORD)
# return back the most recent entry
file_name, file_date = get_ftp_files_list(ftp, 'database')
file_epoch = (file_date - datetime(1970, 1, 1)).total_seconds()
new_file_name = "backup_{epoch}.bak".format(epoch=str(int(file_epoch)))
if os.path.exists(DATAFILEPATH):
shutil.rmtree(DATAFILEPATH)
if not os.path.exists(DATAFILEPATH):
os.makedirs(DATAFILEPATH)
temp_backup_file_location = os.path.join(DATAFILEPATH + new_file_name)
print("Found file {file_name}, and downloading it to {loc}".
format(file_name=file_name, loc=temp_backup_file_location))
ftp_handler = FtpUploadTracker(ftp.size(file_name), temp_backup_file_location)
ftp.retrbinary("RETR " + file_name, ftp_handler.handle)
ftp.quit()
ftp_handler.close()
print("Finished download. Uploading to S3.")
s3_upload(DATAFILEPATH, new_file_name, bucket, "db_backup")
os.remove(temp_backup_file_location)
def main():
try:
retrieve_db()
except Exception as e:
print("Failed to download backup after 4 tries with error {e}.".format(e=e))
return 1
return 0
if __name__ == "__main__":
rtn = main()
sys.exit(rtn)
新的 Spark 代码(进行中):用户名有一个 |使我对 uri 进行编码的字符。当我 运行 代码时,连接被拒绝。我可以为 python.
使用相同的连接信息from pyspark import SparkContext
from pyspark import SparkFiles
import urllib
sc = SparkContext()
ftp_path = "ftp://Username:password@ftplocation.com/path_to_file"
file_path_clean = urllib.parse.urlencode(ftp_path, safe='|')
print(f"file_path_clean: {file_path_clean}")
sc.addFile(ftp_path)
filename = SparkFiles.get(file_path.split('/')[-1])
print(f"filename: {filename}")
rdd = sc.textFile("file://" + filename)
print("We got past rdd = sc.textFile(file:// + filename)")
rdd.take(10)
rdd.collect()
print(rdd)
可通过三种方法解决该问题:
使用由 FTP 支持的挂载文件系统并从 Spark 写入。
使用 Spark 到 SFTP 连接器,例如 spark-sftp。
使用 Spark 将文件写入其他地方,然后作为单独的步骤复制到 SFTP。由于 SFTP 的各种可靠性问题以及 Spark 在失败的写入操作期间留下部分输出的事实,这是我们采用的路径。我们在 Scala 中使用类似于以下代码的代码将 TB 写入 SFTP 端点。希望能对你Python工作有所帮助
/** Defines some high-level operations for interacting with remote file protocols like FTP, SFTP, etc.
*/
trait RemoteFileOperations extends Closeable {
var backoff: BlockingRetry.Backoff = Backoff.linear(3000)
var retry: BlockingRetry.Retry = Retry.maxRetries(3)
var recover: Recovery = recoverable(this)
var ignore: Ignored = nonRecoverable
def listFiles(path: String = ""): Seq[FInfo]
def uploadFile(localPath: String, remoteDirectory: String): Unit
def downloadFile(localPath: String, remotePath: String): Unit
def deleteAll(path: String): Unit
def connect(): Unit = {}
def disconnect(): Unit = {}
def reconnect(): Unit = {
disconnect()
connect()
}
override def close(): Unit = disconnect()
/** Wraps a block of code and allows it to be retried when [[recoverable()]] conditions
* are met. [[BlockingRetry.retry()]] is called with the var fields
* [[backoff]], [[retry]], [[recover]], and [[ignore]], which can all be reconfigured.
*/
def retryable[A](f: => A): A = {
BlockingRetry.retry(retry, backoff, recover, ignore) {
f
}
}
def recoverable(fileOp: RemoteFileOperations): Recovery = {
case (_: SocketTimeoutException, _: Int) =>
fileOp.reconnect()
None
}
def nonRecoverable: Ignored = {
case _: UnknownHostException |
_: SSLException |
_: SocketException |
_: IllegalStateException =>
}
}
class SSHJClient(host: String, username: String, password: String) extends RemoteFileOperations {
import net.schmizz.keepalive.KeepAliveProvider
import net.schmizz.sshj.connection.ConnectionException
import net.schmizz.sshj.sftp.SFTPClient
import net.schmizz.sshj.transport.verification.PromiscuousVerifier
import net.schmizz.sshj.xfer.FileSystemFile
import net.schmizz.sshj.{DefaultConfig, SSHClient}
override def listFiles(path: String): Seq[FInfo] = {
import collection.JavaConverters._
retryable {
sftpSession(sftp => {
sftp.ls(path).asScala
.filter(f => f.getName != "." && f.getName != "..")
.map(f => FInfo(f.getPath, f.getParent, f.isDirectory, f.getAttributes.getSize, f.getAttributes.getMtime))
})
}
}
override def uploadFile(localPath: String, remoteDirectory: String): Unit = {
retryable {
sftpSession(sftp => {
sftp.getFileTransfer.setPreserveAttributes(false)
sftp.put(new FileSystemFile(localPath), remoteDirectory)
})
}
}
override def downloadFile(localPath: String, remotePath: String): Unit = {
retryable {
sftpSession(sftp => {
sftp.getFileTransfer.setPreserveAttributes(false)
sftp.get(remotePath, new FileSystemFile(localPath))
})
}
}
override def deleteAll(path: String): Unit =
throw new UnsupportedOperationException("#deleteAll is unsupported for SSHJClient")
private def sftpSession[A](f: SFTPClient => A): A = {
val defaultConfig = new DefaultConfig()
defaultConfig.setKeepAliveProvider(KeepAliveProvider.KEEP_ALIVE)
val ssh = new SSHClient(defaultConfig)
try {
// This is equivalent to StrictHostKeyChecking=no which is disabled since we don't usually know
// the SSH remote host key ahead of time.
ssh.addHostKeyVerifier(new PromiscuousVerifier())
ssh.connect(host)
ssh.authPassword(username, password)
val sftp = ssh.newSFTPClient()
try {
f(sftp)
} finally {
sftp.close()
}
} finally {
ssh.disconnect()
}
}
override def recoverable(fileOp: RemoteFileOperations): Recovery = {
super.recoverable(fileOp).orElse {
case (e: ConnectionException, _: Int) =>
println(s"Recovering session from exception: $e")
None
}
}
}