将 1M 行 json 行文件分解为单独的 json 文件 - python

Breaking down 1M row jsonlines file into individual json files - python

我正在尝试处理存储在 s3 中的数百个不同的 .jl.gz 格式文件。我需要获取这些文件中每个 1M json 对象的不同部分,并将它们移动到 sql 数据库、mongodb 和 elasticsearch。

这花费的时间太长了。所以,到目前为止,我尝试过的是将所有文件送入 SQS 队列,然后:

1) Running multiple screen sessions on ec2 instances to read through them faster.
    - This worked, but was not ideal because of the manual oversight needed
    - From this, I wondered about a way to do the equivalent of running multiple screen sessions from within python and found multiprocessing module.

多处理模块似乎可以执行我想要的操作,但我将 运行ning 存入内存错误:

OSError: [Errno 12] Cannot allocate memory

为文件中的每一行创建进程或为文件中的每一行创建队列时。请参阅下面创建队列的代码。

from multiprocessing import Process, Lock, Value, Pool, Queue

def create_mp_queue(self, gzf):
    q = Queue()
    for line in gzf:
        q.put(line)
    return q

workers = 2
gzf = gzip.GzipFile(fileobj=f)    
c_queue = create_mp_queue(gzf)

for x in xrange(workers):
    p = Process(target=self.company_to_s3, args=(company_queue,))
    p.start()
    processes.append(p)

for p in processes:
    p.join()   

那么,如何限制队列的大小以免 运行 内存不足?我在 ec2 上,所以我可以增加服务器的大小,但更喜欢足够灵活的解决方案以在任何服务器上实施。

我对使用 python 快速读取大量数据的其他模块、方法、提示、技巧等持开放态度。

我使用了传递给每个处理器的全局计数器,而不是大量的数字队列。