无法 read/write 在 python 中使用多线程处理文件

Can't read/write to files using multithreading in python

我有一个输入文件,其中包含一长串 URL。让我们假设在 mylines.txt:

https://yahoo.com
https://google.com
https://facebook.com
https://twitter.com

我需要做的是:

  1. 从输入文件中读取一行mylines.txt

  2. 执行myFun函数。这将执行一些任务。并产生由一行组成的输出。它在我的真实代码中更复杂。但是在概念上是这样的。

  3. 将输出写入 results.txt 文件

因为我的投入很大。我需要利用 python 多线程。我看着这个不错post here。但不幸的是,它假设输入是一个简单的列表,而不是假设我想把函数的输出写在一个文件中。

我需要确保每个输入的输出都写在一行中(即,如果多线程写入同一行会导致我得到不正确的数据)。

我试图乱来。但没有成功。我以前没有使用 python 的多线程,但现在是学习的时候了,因为在我的情况下这是不可避免的。我有一个很长的列表,没有多线程就无法在合理的时间内完成。我的函数不会做这个简单的任务,而是做更多概念上不需要的操作。

这是我的尝试。请纠正我(在代码本身):

import threading
import requests
from multiprocessing.dummy import Pool as ThreadPool
import Queue

def myFunc(url):
        response = requests.get(url, verify=False ,timeout=(2, 5))
        results = open("myresults","a") # "a" to append results
        results.write("url is:",url, ", response is:", response.url)
        results.close()

worker_data = open("mylines.txt","r") # open my input file.

#load up a queue with your data, this will handle locking
q = Queue.Queue()

for url in worker_data:
    q.put(url)

# make the Pool of workers
pool = ThreadPool(4)
results = pool.map(myFunc, q)

# close the pool and wait for the work to finish
pool.close()
pool.join()

问:如何修复上面的代码(请简明扼要并在代码本身中帮助我)从输入文件中读取一行,执行函数,使用 python 多线程并发执行 requests 所以我可以在合理的时间内完成我的列表。

更新:

根据答案,代码变为:

import threading
import requests
from multiprocessing.dummy import Pool as ThreadPool
import queue
from multiprocessing import Queue

def myFunc(url):
    response = requests.get(url, verify=False ,timeout=(2, 5))
    return "url is:" + url + ", response is:" + response.url

worker_data = open("mylines.txt","r") # open my input file.

#load up a queue with your data, this will handle locking
q = queue.Queue(4)
with open("mylines.txt","r") as f: # open my input file.
    for url in f:
        q.put(url)

# make the Pool of workers
pool = ThreadPool(4)
results = pool.map(myFunc, q)

with open("myresults","w") as f:
    for line in results:
        f.write(line + '\n')

mylines.txt 包含:

https://yahoo.com
https://www.google.com
https://facebook.com
https://twitter.com

请注意,我首先使用的是:

import Queue

并且: q = Queue.Queue(4)

但出现错误提示:

Traceback (most recent call last):
  File "test3.py", line 4, in <module>
    import Queue
ModuleNotFoundError: No module named 'Queue'

根据一些搜索我改为:

import queue

并将相关行发送至: q = queue.Queue(4)

我还补充了:

from multiprocessing import Queue

但没有任何效果。 python 多线程方面的专家可以提供帮助吗?

您应该将函数更改为 return 字符串:

def myFunc(url):
    response = requests.get(url, verify=False ,timeout=(2, 5))
    return "url is:" + url + ", response is:" + response.url

稍后将这些字符串写入文件:

results = pool.map(myFunc, q)

with open("myresults","w") as f:
    for line in results:
        f.write(line + '\n')

这使 requests.get 的多线程保持工作状态,但序列化将结果写入输出文件。

更新:

你还应该使用 with 来读取输入文件:

#load up a queue with your data, this will handle locking
q = Queue.Queue()

with open("mylines.txt","r") as f: # open my input file.
    for url in f:
        q.put(url)

不是让工作池线程打印出结果,这不能保证正确缓冲输出,而是创建一个线程,从第二个 Queue 读取结果并打印它们。

我已经修改了您的解决方案,因此它构建了自己的工作线程池。给队列一个无限长没有什么意义,因为当队列达到最大大小时主线程会阻塞:你只需要它足够长以确保工作线程总是有工作要处理——主线程会阻塞并随着队列大小的增加和减少而解锁。

它还标识了负责输出队列中每个项目的线程,这应该让您对多线程方法的工作有一定的信心,并打印来自服务器的响应代码。我发现我必须从 URL 中删除换行符。

因为现在只有一个线程在写入文件,所以写入总是完全同步的,它们之间没有可能相互干扰。

import threading
import requests
import queue
POOL_SIZE = 4

def myFunc(inq, outq):  # worker thread deals only with queues
    while True:
        url = inq.get()  # Blocks until something available
        if url is None:
            break
        response = requests.get(url.strip(), timeout=(2, 5))
        outq.put((url, response, threading.currentThread().name))


class Writer(threading.Thread):
    def __init__(self, q):
        super().__init__()
        self.results = open("myresults","a") # "a" to append results
        self.queue = q
    def run(self):
        while True:
            url, response, threadname = self.queue.get()
            if response is None:
                self.results.close()
                break
            print("****url is:",url, ", response is:", response.status_code, response.url, "thread", threadname, file=self.results)

#load up a queue with your data, this will handle locking
inq = queue.Queue()  # could usefully limit queue size here
outq = queue.Queue()

# start the Writer
writer = Writer(outq)
writer.start()

# make the Pool of workers
threads = []
for i in range(POOL_SIZE):
    thread = threading.Thread(target=myFunc, name=f"worker{i}", args=(inq, outq))
    thread.start()
    threads.append(thread)

# push the work onto the queues
with open("mylines.txt","r") as worker_data: # open my input file.
    for url in worker_data:
        inq.put(url.strip())
for thread in threads:
    inq.put(None)

# close the pool and wait for the workers to finish
for thread in threads:
    thread.join()

# Terminate the writer
outq.put((None, None, None))
writer.join()

使用 mylines.txt 中给出的数据,我看到以下输出:

****url is: https://www.google.com , response is: 200 https://www.google.com/ thread worker1
****url is: https://twitter.com , response is: 200 https://twitter.com/ thread worker2
****url is: https://facebook.com , response is: 200 https://www.facebook.com/ thread worker0
****url is: https://www.censys.io , response is: 200 https://censys.io/ thread worker1
****url is: https://yahoo.com , response is: 200 https://uk.yahoo.com/?p=us thread worker3