Python 每行读取大文件行并将其发送到多处理或线程

Python Read huge file line per line and send it to multiprocessing or thread

我已经尝试让我的代码工作很多天了, 我很绝望。 我已经在网上搜索过了,但还是找不到。

我有一个以“latin-1”编码的文本文件,大小为 9GB -> 737 022 387 行,每行包含一个字符串。

我想读取每一行并在等待响应的 http PUT 请求中发送它们,并且 returns 如果响应为 200 或 400,则为 TRUE 或 FALSE PUT 请求大约需要 1 到 3 秒,因此为了加快处理时间,我想使用线程或多处理。

首先,我模拟我的 PUT 请求休眠 3 秒。 甚至我无法让它工作

此代码将我的字符串拆分为字符,我不知道为什么...

from multiprocessing import Pool
from time import sleep


def process_line(line):
   sleep(3)
   print(line)
   return True

if __name__ == "__main__":
    pool = Pool(2)
    peon =  open(r'D:\txtFile',encoding="latin-1")
    for line in peon:
        res = pool.map(process_line,line )
        print(res)

这给出错误:TypeError: process_line() takes 1 positional argument but 17 were given

import multiprocessing
from multiprocessing import Pool
from time import sleep


def process_line(line):
   sleep(3)
   print(line)
   return True

if __name__ == "__main__":
    pool = Pool(2)
    with open(r"d:\txtFile",encoding="latin-1") as file:
        res = pool.apply(process_line,file.readline() )
        print(res)

那个:使计算机崩溃

from multiprocessing import Pool
from time import sleep


def process_line(line):
   sleep(3)
   print(line)
   return True

if __name__ == "__main__":
    pool = Pool(2)
    peon =  open(r'D:\txtFile',encoding="latin-1")
    for line in peon:
        res = pool.map(process_line,peon )
        print(res)

虽然这个问题看似不切实际。拍摄 737,022,387 个请求!计算单台计算机需要多少个月!!

不过,完成此任务的更好方法是在单独的线程中从文件中逐行读取并插入到队列中。然后 multi-process 队列。

解决方案一:

from multiprocessing import Queue, Process
from threading import Thread
from time import sleep

urls_queue = Queue()
max_process = 4

def read_urls():
    with open('urls_file.txt', 'r') as f:
        for url in f:
            urls_queue.put(url.strip())
            print('put url: {}'.format(url.strip()))

    # put DONE to tell send_request_processor to exit
    for i in range(max_process):
        urls_queue.put("DONE")


def send_request(url):
    print('send request: {}'.format(url))
    sleep(1)
    print('recv response: {}'.format(url))


def send_request_processor():
    print('start send request processor')
    while True:
        url = urls_queue.get()
        if url == "DONE":
            break
        else:
            send_request(url)


def main():
    file_reader_thread = Thread(target=read_urls)
    file_reader_thread.start()

    procs = []
    for i in range(max_process):
        p = Process(target=send_request_processor)
        procs.append(p)
        p.start()

    for p in procs:
        p.join()

    print('all done')
    # wait for all tasks in the queue
    file_reader_thread.join()


if __name__ == '__main__':
    main()

演示:https://onlinegdb.com/Elfo5bGFz

方案二:

您可以使用 tornado 异步网络库

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue

q = Queue(maxsize=2)

async def consumer():
    async for item in q:
        try:
            print('Doing work on %s' % item)
            await gen.sleep(0.01)
        finally:
            q.task_done()

async def producer():
    with open('urls_file.txt', 'r') as f:
        for url in f:
            await q.put(url)
            print('Put %s' % item)

async def main():
    # Start consumer without waiting (since it never finishes).
    IOLoop.current().spawn_callback(consumer)
    await producer()     # Wait for producer to put all tasks.
    await q.join()       # Wait for consumer to finish all tasks.
    print('Done')
    # producer and consumer can run in parallel

IOLoop.current().run_sync(main)

使用方法 multiprocessing.pool.imap 是朝着正确方向迈出的一步,但问题是输入如此多,输入任务队列的速度将快于处理池从队列中取出任务的速度,并且 return 结果。因此,任务队列将继续增长,您将耗尽内存。需要的是一种“节流”方法 imap 的方法,以便一旦任务队列大小有 N 个任务,它就会阻塞。我认为 N 作为默认值的合理值是池大小的两倍,以确保当池进程完成任务时,它不会延迟找到另一个任务来处理。因此我们创建 类 BoundedQueueProcessPool (多处理)和 BoundedQueueThreadPool (多线程):

import multiprocessing.pool
import multiprocessing
import threading


class ImapResult():
    def __init__(self, semaphore, result):
        self._semaphore = semaphore
        self.it = result.__iter__()

    def __iter__(self):
        return self

    def __next__(self):
        try:
            elem = self.it.__next__()
            self._semaphore.release()
            return elem
        except StopIteration:
            raise
        except:
            self._semaphore.release()
            raise

class BoundedQueuePool:
    def __init__(self, limit, semaphore):
        self._limit = limit
        self._semaphore = semaphore

    def release(self, result, callback=None):
        self._semaphore.release()
        if callback:
            callback(result)

    def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):
        self._semaphore.acquire()
        callback_fn = self.release if callback is None else lambda result: self.release(result, callback=callback)
        error_callback_fn = self.release if error_callback is None else lambda result: self.release(result, callback=callback)
        return super().apply_async(func, args, kwds, callback=callback_fn, error_callback=error_callback_fn)

    def imap(self, func, iterable, chunksize=1):
        def new_iterable(iterable):
            for elem in iterable:
                self._semaphore.acquire()
                yield elem
        if chunksize > self._limit:
            raise ValueError(f'chunksize argument exceeds {self._limit}')
        result = super().imap(func, new_iterable(iterable), chunksize)
        return ImapResult(self._semaphore, result)

    def imap_unordered(self, func, iterable, chunksize=1):
        def new_iterable(iterable):
            for elem in iterable:
                self._semaphore.acquire()
                yield elem
        if chunksize > self._limit:
            raise ValueError(f'chunksize argument exceeds {self._limit}')
        result = super().imap_unordered(func, new_iterable(iterable), chunksize)
        return ImapResult(self._semaphore, result)

class BoundedQueueProcessPool(BoundedQueuePool, multiprocessing.pool.Pool):
    def __init__(self, *args, max_waiting_tasks=None, **kwargs):
        multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
        if max_waiting_tasks is None:
            max_waiting_tasks = self._processes
        elif max_waiting_tasks < 0:
            raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
        limit = self._processes + max_waiting_tasks
        BoundedQueuePool.__init__(self, limit, multiprocessing.BoundedSemaphore(limit))

class BoundedQueueThreadPool(BoundedQueuePool, multiprocessing.pool.ThreadPool):
    def __init__(self, *args, max_waiting_tasks=None, **kwargs):
        multiprocessing.pool.ThreadPool.__init__(self, *args, **kwargs)
        if max_waiting_tasks is None:
            max_waiting_tasks = self._processes
        elif max_waiting_tasks < 0:
            raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
        limit = self._processes + max_waiting_tasks
        BoundedQueuePool.__init__(self, limit, threading.BoundedSemaphore(limit))


#######################################################################

from time import sleep


def process_line(line):
    sleep(3)
    # the lines already have line end characters:
    print(line, end='')
    return True

if __name__ == "__main__":
    pool = BoundedQueueProcessPool(2)
    with open("test.txt") as file:
        for res in pool.imap(process_line, file):
            #print(res)
            pass
    pool.close()
    pool.join()