concurrent.futures.ThreadPoolExecutor / 多线程内存不足(已终止)
concurrent.futures.ThreadPoolExecutor / Multithreading runs out of memory (Killed)
我目前正在学习一个据称简单的网络抓取项目 python。我有一个大约 70MB 的列表,其中包含我要处理的几百万个 IP 地址 (sys.argv[1])。当然,并不是所有的都可以到达。
我正在尝试使用 concurrent.futures,目前遇到内存问题 - 最终导致整个进程被终止。
现在,我按照建议将我的未来分为两组(完成和未完成)here。
我正在使用大约 100 个工作人员 (sys.argv[2]) 并且有 1GB 可用内存。
虽然所有已完成的期货都会在调用 future.results() 时释放 => 期货 1000 已完成?但是,它似乎只是在减慢进程(包括内存被填充直到进程被杀死)。
我在这里错过了什么?关于如何处理这个问题有什么建议吗?
提前致谢。
我的代码如下:
import sys
import requests
import concurrent.futures
import urllib3
from lxml.html import fromstring
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def title(host):
try:
url="https://"+host
r = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=3, verify=False)
tree = fromstring(r.content.decode('utf-8'))
title = tree.findtext('.//title')
print(host+": "+title)
except:
pass
max=int(sys.argv[2])
with concurrent.futures.ThreadPoolExecutor(max_workers=max) as executor:
futures_done = set()
futures_notdone = set()
with open(sys.argv[1]) as f:
for line in f:
host = line.strip()
futures_notdone.add(executor.submit(title, host))
if len(futures_notdone) >= max:
done, futures_notdone = concurrent.futures.wait(futures_notdone, return_when=concurrent.futures.FIRST_COMPLETED)
futures_done.update(done)
for future in futures_done:
if len(futures_done) >= 1000:
future.result()
看起来您正在将已完成的期货存储在一个集合中,但稍后没有清除此列表,因此它会变得非常大。这可能是您的内存问题的原因。 future的.release()
方法没有release,还在done_future
列表中引用
不完美,但您可以尝试以下方法。它最多安排 max
个作业同时执行。它定期收集已完成的工作并重新安排新工作。 idea 来自这个博客。
我在这个方法中看到的缺点是它必须定期轮询 max
计划的作业以找到已完成的作业,如果 max
值很大,这可能会很慢。
import sys
import requests
import concurrent.futures
import urllib3
from itertools import islice
from lxml.html import fromstring
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def title(host: str) -> str:
try:
url="https://"+host
r = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=3, verify=False)
tree = fromstring(r.content.decode('utf-8'))
title = tree.findtext('.//title')
return host+": "+title
except:
pass
max = int(sys.argv[2])
with concurrent.futures.ThreadPoolExecutor(max_workers=max) as executor:
with open(sys.argv[1]) as f:
futures = {executor.submit(title, h) for h in islice(f, max)}
while futures:
done, futures = concurrent.futures.wait(
futures, return_when=concurrent.futures.FIRST_COMPLETED)
for future in done:
print(future.result())
for h in islice(f, len(done)):
futures.add(executor.submit(title, h))
这里有一个可能对您有用的解决方法,它 运行 超过 100 万次迭代,而无需在我的计算机上使用超过 150 Mo。
它只是一个带有两个队列的自定义线程池,用于管理并发资源访问并限制最大并发数。
import sys
from typing import Optional
import requests
import urllib3
from lxml.html import fromstring
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from queue import Queue
from threading import Thread
def get_title(host: str) -> Optional[str]:
try:
url = f"https://{host}"
r = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=1, verify=False)
tree = fromstring(r.content.decode('utf-8'))
title = tree.findtext('.//title')
return f"{host}: {title}"
except Exception:
return None
class Pool:
def __init__(self, work, max_concurrent_jobs, max_worker: int = 32) -> None:
self.max_workers = max_worker
self.work_queue = Queue(max_concurrent_jobs)
self.out_queue = Queue()
self.is_running = True
def _work():
while self.is_running:
item = self.work_queue.get()
result = work(item)
self.work_queue.task_done()
self.out_queue.put(result)
for _ in range(max_worker):
Thread(target=_work).start()
def close(self):
self.is_running = False
if __name__ == "__main__":
file_name = sys.argv[1]
max = int(sys.argv[2])
pool = Pool(work=get_title, max_concurrent_jobs=max)
def worker():
while True:
item = pool.out_queue.get()
if item is not None:
print(item) # Or any follow-up job
pool.out_queue.task_done()
Thread(target=worker, daemon=True).start()
with open(file_name) as f:
for h in f:
pool.work_queue.put(h.strip())
我目前正在学习一个据称简单的网络抓取项目 python。我有一个大约 70MB 的列表,其中包含我要处理的几百万个 IP 地址 (sys.argv[1])。当然,并不是所有的都可以到达。
我正在尝试使用 concurrent.futures,目前遇到内存问题 - 最终导致整个进程被终止。
现在,我按照建议将我的未来分为两组(完成和未完成)here。 我正在使用大约 100 个工作人员 (sys.argv[2]) 并且有 1GB 可用内存。
虽然所有已完成的期货都会在调用 future.results() 时释放 => 期货 1000 已完成?但是,它似乎只是在减慢进程(包括内存被填充直到进程被杀死)。
我在这里错过了什么?关于如何处理这个问题有什么建议吗?
提前致谢。
我的代码如下:
import sys
import requests
import concurrent.futures
import urllib3
from lxml.html import fromstring
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def title(host):
try:
url="https://"+host
r = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=3, verify=False)
tree = fromstring(r.content.decode('utf-8'))
title = tree.findtext('.//title')
print(host+": "+title)
except:
pass
max=int(sys.argv[2])
with concurrent.futures.ThreadPoolExecutor(max_workers=max) as executor:
futures_done = set()
futures_notdone = set()
with open(sys.argv[1]) as f:
for line in f:
host = line.strip()
futures_notdone.add(executor.submit(title, host))
if len(futures_notdone) >= max:
done, futures_notdone = concurrent.futures.wait(futures_notdone, return_when=concurrent.futures.FIRST_COMPLETED)
futures_done.update(done)
for future in futures_done:
if len(futures_done) >= 1000:
future.result()
看起来您正在将已完成的期货存储在一个集合中,但稍后没有清除此列表,因此它会变得非常大。这可能是您的内存问题的原因。 future的.release()
方法没有release,还在done_future
列表中引用
不完美,但您可以尝试以下方法。它最多安排 max
个作业同时执行。它定期收集已完成的工作并重新安排新工作。 idea 来自这个博客。
我在这个方法中看到的缺点是它必须定期轮询 max
计划的作业以找到已完成的作业,如果 max
值很大,这可能会很慢。
import sys
import requests
import concurrent.futures
import urllib3
from itertools import islice
from lxml.html import fromstring
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def title(host: str) -> str:
try:
url="https://"+host
r = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=3, verify=False)
tree = fromstring(r.content.decode('utf-8'))
title = tree.findtext('.//title')
return host+": "+title
except:
pass
max = int(sys.argv[2])
with concurrent.futures.ThreadPoolExecutor(max_workers=max) as executor:
with open(sys.argv[1]) as f:
futures = {executor.submit(title, h) for h in islice(f, max)}
while futures:
done, futures = concurrent.futures.wait(
futures, return_when=concurrent.futures.FIRST_COMPLETED)
for future in done:
print(future.result())
for h in islice(f, len(done)):
futures.add(executor.submit(title, h))
这里有一个可能对您有用的解决方法,它 运行 超过 100 万次迭代,而无需在我的计算机上使用超过 150 Mo。
它只是一个带有两个队列的自定义线程池,用于管理并发资源访问并限制最大并发数。
import sys
from typing import Optional
import requests
import urllib3
from lxml.html import fromstring
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from queue import Queue
from threading import Thread
def get_title(host: str) -> Optional[str]:
try:
url = f"https://{host}"
r = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=1, verify=False)
tree = fromstring(r.content.decode('utf-8'))
title = tree.findtext('.//title')
return f"{host}: {title}"
except Exception:
return None
class Pool:
def __init__(self, work, max_concurrent_jobs, max_worker: int = 32) -> None:
self.max_workers = max_worker
self.work_queue = Queue(max_concurrent_jobs)
self.out_queue = Queue()
self.is_running = True
def _work():
while self.is_running:
item = self.work_queue.get()
result = work(item)
self.work_queue.task_done()
self.out_queue.put(result)
for _ in range(max_worker):
Thread(target=_work).start()
def close(self):
self.is_running = False
if __name__ == "__main__":
file_name = sys.argv[1]
max = int(sys.argv[2])
pool = Pool(work=get_title, max_concurrent_jobs=max)
def worker():
while True:
item = pool.out_queue.get()
if item is not None:
print(item) # Or any follow-up job
pool.out_queue.task_done()
Thread(target=worker, daemon=True).start()
with open(file_name) as f:
for h in f:
pool.work_queue.put(h.strip())