为什么在使用 smart_open 读写 s3 时出现 ConnectionResetError?

Why do I get ConnectionResetError when reading and writing from and to s3 using smart_open?

根据 上的讨论,以下代码可以即时读取和写回 s3:

from smart_open import open
import os

bucket_dir = "s3://my-bucket/annotations/"

with open(os.path.join(bucket_dir, "in.tsv.gz"), "rb") as fin:
    with open(
        os.path.join(bucket_dir, "out.tsv.gz"), "wb"
    ) as fout:
        for line in fin:
            l = [i.strip() for i in line.decode().split("\t")]
            string = "\t".join(l) + "\n"
            fout.write(string.encode())    

问题是在处理了几千行后(几分钟)我收到“连接被对等重置”错误:

    raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ("Connection broken: ConnectionResetError(104, 'Connection reset by peer')", ConnectionResetError(104, 'Connection reset by peer'))

我能做什么?我尝试在每个 fout.write(string.encode()) 之后 fout.flush(),但效果不佳。有没有更好的解决方案来处理大约 2 亿行的 .tsv 文件?

我在 smart_open 之上实现了一些生产者-消费者方法。这减轻了 Connection broke 错误,但在某些情况下并不能完全解决它。

class Producer:
    def __init__(self, queue, bucket_dir, input_file):
        self.queue = queue
        self.bucket_dir = bucket_dir
        self.input_file = input_file

    def run(self):
        with open(os.path.join(self.bucket_dir, self.input_file), "rb") as fin:
            for line in tqdm(fin):
                while self.queue.full():
                    time.sleep(0.05)
                self.queue.put(line_to_write)
        self.queue.put("DONE")


class Consumer:
    def __init__(self, queue, bucket_dir, output_file):
        self.queue = queue
        self.bucket_dir = bucket_dir
        self.output_file = output_file

    def run(self):
        done = False
        to_write = ""
        count = 0
        with open(os.path.join(self.bucket_dir, self.output_file), "wb") as fout:
            while True:
                while self.queue.empty():
                    time.sleep(0.05)
                item = self.queue.get()
                if item == "DONE":
                    fout.write(to_write)
                    fout.flush()
                    self.queue.task_done()
                    return

                count += 1
                to_write += item
                if count % 256 == 0:  # batch write
                    fout.write(to_write.encode())
                    fout.flush()


def main(args):
    q = Queue(1024)

    producer = Producer(q, args.bucket_dir, args.input_file)
    producer_thread = threading.Thread(target=producer.run)

    consumer = Consumer(q, args.bucket_dir, args.output_file)
    consumer_thread = threading.Thread(target=consumer.run)

    producer_thread.start()
    consumer_thread.start()

    producer_thread.join()
    consumer_thread.join()
    q.join()