为什么在使用 smart_open 读写 s3 时出现 ConnectionResetError?
Why do I get ConnectionResetError when reading and writing from and to s3 using smart_open?
根据 上的讨论,以下代码可以即时读取和写回 s3:
from smart_open import open
import os
bucket_dir = "s3://my-bucket/annotations/"
with open(os.path.join(bucket_dir, "in.tsv.gz"), "rb") as fin:
with open(
os.path.join(bucket_dir, "out.tsv.gz"), "wb"
) as fout:
for line in fin:
l = [i.strip() for i in line.decode().split("\t")]
string = "\t".join(l) + "\n"
fout.write(string.encode())
问题是在处理了几千行后(几分钟)我收到“连接被对等重置”错误:
raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ("Connection broken: ConnectionResetError(104, 'Connection reset by peer')", ConnectionResetError(104, 'Connection reset by peer'))
我能做什么?我尝试在每个 fout.write(string.encode())
之后 fout.flush()
,但效果不佳。有没有更好的解决方案来处理大约 2 亿行的 .tsv 文件?
我在 smart_open
之上实现了一些生产者-消费者方法。这减轻了 Connection broke
错误,但在某些情况下并不能完全解决它。
class Producer:
def __init__(self, queue, bucket_dir, input_file):
self.queue = queue
self.bucket_dir = bucket_dir
self.input_file = input_file
def run(self):
with open(os.path.join(self.bucket_dir, self.input_file), "rb") as fin:
for line in tqdm(fin):
while self.queue.full():
time.sleep(0.05)
self.queue.put(line_to_write)
self.queue.put("DONE")
class Consumer:
def __init__(self, queue, bucket_dir, output_file):
self.queue = queue
self.bucket_dir = bucket_dir
self.output_file = output_file
def run(self):
done = False
to_write = ""
count = 0
with open(os.path.join(self.bucket_dir, self.output_file), "wb") as fout:
while True:
while self.queue.empty():
time.sleep(0.05)
item = self.queue.get()
if item == "DONE":
fout.write(to_write)
fout.flush()
self.queue.task_done()
return
count += 1
to_write += item
if count % 256 == 0: # batch write
fout.write(to_write.encode())
fout.flush()
def main(args):
q = Queue(1024)
producer = Producer(q, args.bucket_dir, args.input_file)
producer_thread = threading.Thread(target=producer.run)
consumer = Consumer(q, args.bucket_dir, args.output_file)
consumer_thread = threading.Thread(target=consumer.run)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
q.join()
根据
from smart_open import open
import os
bucket_dir = "s3://my-bucket/annotations/"
with open(os.path.join(bucket_dir, "in.tsv.gz"), "rb") as fin:
with open(
os.path.join(bucket_dir, "out.tsv.gz"), "wb"
) as fout:
for line in fin:
l = [i.strip() for i in line.decode().split("\t")]
string = "\t".join(l) + "\n"
fout.write(string.encode())
问题是在处理了几千行后(几分钟)我收到“连接被对等重置”错误:
raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ("Connection broken: ConnectionResetError(104, 'Connection reset by peer')", ConnectionResetError(104, 'Connection reset by peer'))
我能做什么?我尝试在每个 fout.write(string.encode())
之后 fout.flush()
,但效果不佳。有没有更好的解决方案来处理大约 2 亿行的 .tsv 文件?
我在 smart_open
之上实现了一些生产者-消费者方法。这减轻了 Connection broke
错误,但在某些情况下并不能完全解决它。
class Producer:
def __init__(self, queue, bucket_dir, input_file):
self.queue = queue
self.bucket_dir = bucket_dir
self.input_file = input_file
def run(self):
with open(os.path.join(self.bucket_dir, self.input_file), "rb") as fin:
for line in tqdm(fin):
while self.queue.full():
time.sleep(0.05)
self.queue.put(line_to_write)
self.queue.put("DONE")
class Consumer:
def __init__(self, queue, bucket_dir, output_file):
self.queue = queue
self.bucket_dir = bucket_dir
self.output_file = output_file
def run(self):
done = False
to_write = ""
count = 0
with open(os.path.join(self.bucket_dir, self.output_file), "wb") as fout:
while True:
while self.queue.empty():
time.sleep(0.05)
item = self.queue.get()
if item == "DONE":
fout.write(to_write)
fout.flush()
self.queue.task_done()
return
count += 1
to_write += item
if count % 256 == 0: # batch write
fout.write(to_write.encode())
fout.flush()
def main(args):
q = Queue(1024)
producer = Producer(q, args.bucket_dir, args.input_file)
producer_thread = threading.Thread(target=producer.run)
consumer = Consumer(q, args.bucket_dir, args.output_file)
consumer_thread = threading.Thread(target=consumer.run)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
q.join()