逐行读取 CSV FTP,而不将整个文件存储在 memory/disk 中
Read CSV over FTP line by line without storing the whole file in memory/disk
我被管道卡住了 ftplib.FTP.retrlines
to csv.reader
...
FTP.retrlines
重复调用其中包含一行的回调,而 csv.reader
期望迭代器在每次调用其 __next__()
方法时 return 是一个字符串。
如何将这两件事结合在一起,以便我可以读取和处理文件,而无需提前读取整个文件,例如将其存储在一个文件中。 io.TextIOWrapper
?
我的问题是 FTP.retrlines
不会 return 直到它消耗了整个文件...
我不确定是否有更好的解决方案,但您可以使用可迭代的类似队列的对象将 FTP.retrlines
和 csv.reader
粘合在一起。由于这两个函数是同步的,因此您必须 运行 在不同的线程上并行处理它们。
像这样:
from queue import Queue
from ftplib import FTP
from threading import Thread
import csv
ftp = FTP(host)
ftp.login(username, password)
class LineQueue:
_queue = Queue(10)
def add(self, s):
print(f"Queueing line {s}")
self._queue.put(s)
print(f"Queued line {s}")
def done(self):
print("Signaling Done")
self._queue.put(False)
print("Signaled Done")
def __iter__(self):
print("Reading lines")
while True:
print("Reading line")
s = self._queue.get()
if s == False:
print("Read all lines")
break
print(f"Read line {s}")
yield s
q = LineQueue()
def download():
ftp.retrlines("RETR /path/data.csv", q.add)
q.done()
thread = Thread(target=download)
thread.start()
print("Reading CSV")
for entry in csv.reader(q):
print(entry)
print("Read CSV")
thread.join()
直接与 , just saved some line of code subclassing queue.Queue
相同的解决方案。
from queue import Queue
from ftplib import FTP
from threading import Thread
import csv
ftp = FTP(**ftp_credentials)
class LineQueue(Queue):
def __iter__(self):
while True:
s = self.get()
if s is None:
break
yield s
def __call__(self):
ftp.retrlines(f"RETR {fname}", self.put)
self.put(None)
q = LineQueue(10)
thread = Thread(target=q)
thread.start()
for entry in csv.reader(q):
print(entry)
thread.join()
我被管道卡住了 ftplib.FTP.retrlines
to csv.reader
...
FTP.retrlines
重复调用其中包含一行的回调,而 csv.reader
期望迭代器在每次调用其 __next__()
方法时 return 是一个字符串。
如何将这两件事结合在一起,以便我可以读取和处理文件,而无需提前读取整个文件,例如将其存储在一个文件中。 io.TextIOWrapper
?
我的问题是 FTP.retrlines
不会 return 直到它消耗了整个文件...
我不确定是否有更好的解决方案,但您可以使用可迭代的类似队列的对象将 FTP.retrlines
和 csv.reader
粘合在一起。由于这两个函数是同步的,因此您必须 运行 在不同的线程上并行处理它们。
像这样:
from queue import Queue
from ftplib import FTP
from threading import Thread
import csv
ftp = FTP(host)
ftp.login(username, password)
class LineQueue:
_queue = Queue(10)
def add(self, s):
print(f"Queueing line {s}")
self._queue.put(s)
print(f"Queued line {s}")
def done(self):
print("Signaling Done")
self._queue.put(False)
print("Signaled Done")
def __iter__(self):
print("Reading lines")
while True:
print("Reading line")
s = self._queue.get()
if s == False:
print("Read all lines")
break
print(f"Read line {s}")
yield s
q = LineQueue()
def download():
ftp.retrlines("RETR /path/data.csv", q.add)
q.done()
thread = Thread(target=download)
thread.start()
print("Reading CSV")
for entry in csv.reader(q):
print(entry)
print("Read CSV")
thread.join()
直接与 queue.Queue
相同的解决方案。
from queue import Queue
from ftplib import FTP
from threading import Thread
import csv
ftp = FTP(**ftp_credentials)
class LineQueue(Queue):
def __iter__(self):
while True:
s = self.get()
if s is None:
break
yield s
def __call__(self):
ftp.retrlines(f"RETR {fname}", self.put)
self.put(None)
q = LineQueue(10)
thread = Thread(target=q)
thread.start()
for entry in csv.reader(q):
print(entry)
thread.join()