Python 多处理被 selenium 卡住了
Python Multiprocessing gets stuck with selenium
所以我有代码可以启动 4 个 selenium chrome 驱动程序并从网页上的元素中抓取数据。代码可以简化成这样:
import json
import multiprocessing as mp
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
class scraper():
def __init__(self,list_of_urls, process_num):
self.urls = list_of_urls
self.process_num = process_num
def scrape_urls(self):
driver = webdriver.Chrome(driver_dir)
data = []
for url in self.urls:
driver.get(url)
element = WebDriverWait(driver, timeout=7).until(lambda d: d.find_element(by=By.CLASS_NAME, value="InterestingData"))
data.append(element.text)
print("Scraper # ", self.process_num," got data from: ",url)
return data
if __name__ == '__main__':
with open('array_of_urls', 'r') as infile:
urls = json.load(infile)
number_of_processes=4
length_of_urls = len(urls)
partition_into = math.ceil(length_of_urls/number_of_processes)
scrapers = []
start = 0
end = start + partition_into
for num in range(number_of_processes):
new_scraper = scraper(urls[start:end],num)
scrapers.append(new_scraper)
start = end
end = start + partition_into
if end > length_of_urls:
end = length_of_urls-1
with mp.Pool(processes=number_of_processes) as pool:
result_array = []
for num in range(number_of_processes):
result_array.append(pool.apply_async(scrapers[num].scrape_urls))
pool.close()
pool.join()
我 运行 遇到的问题是,在 5-10 分钟后,其中一个抓取器会停止,唯一能唤醒它的是在浏览器上手动刷新页面。如果我离开它一个小时左右,4 个中的 3 个停止,只有一个是 运行ning。他们不会出错或打印任何它只是停止 运行ning 的东西。我在两台不同的笔记本电脑上试过,它们都有同样的问题。我也试过用 4 个不同的 mp.Process() 运行ning scrape_url 来做这件事,这也做同样的事情。有其他人 运行 解决这个问题还是我做错了什么?
一方面,Selenium 已经在创建一个进程,因此使用多线程而不是多处理要好得多,因为无论如何每个线程都会启动一个进程。此外,在 scrape_urls
中,在 driver = webdriver.Chrome(driver_dir)
语句之后,函数的其余部分应包含在 try/finally 语句中,其中 finally 块包含 driver.quit()
以确保驱动程序进程终止,无论是否有异常。现在您将离开所有驱动程序进程 运行.
您也可以考虑使用以下技术创建大小为 4 的线程池(或更小,具体取决于要处理的 URL 数量),但池中的每个线程都会自动重用已分配给的驱动程序它的线程,保存在线程本地存储中。您可能希望更改用于创建驱动程序的选项(当前为“无头”模式):
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from multiprocessing.pool import ThreadPool
import threading
import gc
threadLocal = threading.local()
class Driver:
def __init__(self):
options = webdriver.ChromeOptions()
options.add_argument("--headless")
options.add_experimental_option('excludeSwitches', ['enable-logging'])
self.driver = webdriver.Chrome(options=options)
def __del__(self):
self.driver.quit() # clean up driver when we are cleaned up
print('The driver has been "quitted".')
@classmethod
def create_driver(cls):
the_driver = getattr(threadLocal, 'the_driver', None)
if the_driver is None:
print('Creating new driver.')
the_driver = cls()
threadLocal.the_driver = the_driver
driver = the_driver.driver
the_driver = None
return driver
def scraper(url):
"""
This now scrapes a single URL.
"""
driver = Driver.create_driver()
driver.get(url)
element = WebDriverWait(driver, timeout=7).until(lambda d: d.find_element(by=By.CLASS_NAME, value="InterestingData"))
print("got data from: ", url)
return element.text
with open('array_of_urls', 'r') as infile:
urls = json.load(infile)
number_of_processes = min(4, len(urls))
with ThreadPool(processes=number_of_processes) as pool:
result_array = pool.map(scraper, urls)
# Must ensure drivers are quitted before threads are destroyed:
del threadLocal
# This should ensure that the __del__ method is run on class Driver:
gc.collect()
pool.close()
pool.join()
所以我有代码可以启动 4 个 selenium chrome 驱动程序并从网页上的元素中抓取数据。代码可以简化成这样:
import json
import multiprocessing as mp
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
class scraper():
def __init__(self,list_of_urls, process_num):
self.urls = list_of_urls
self.process_num = process_num
def scrape_urls(self):
driver = webdriver.Chrome(driver_dir)
data = []
for url in self.urls:
driver.get(url)
element = WebDriverWait(driver, timeout=7).until(lambda d: d.find_element(by=By.CLASS_NAME, value="InterestingData"))
data.append(element.text)
print("Scraper # ", self.process_num," got data from: ",url)
return data
if __name__ == '__main__':
with open('array_of_urls', 'r') as infile:
urls = json.load(infile)
number_of_processes=4
length_of_urls = len(urls)
partition_into = math.ceil(length_of_urls/number_of_processes)
scrapers = []
start = 0
end = start + partition_into
for num in range(number_of_processes):
new_scraper = scraper(urls[start:end],num)
scrapers.append(new_scraper)
start = end
end = start + partition_into
if end > length_of_urls:
end = length_of_urls-1
with mp.Pool(processes=number_of_processes) as pool:
result_array = []
for num in range(number_of_processes):
result_array.append(pool.apply_async(scrapers[num].scrape_urls))
pool.close()
pool.join()
我 运行 遇到的问题是,在 5-10 分钟后,其中一个抓取器会停止,唯一能唤醒它的是在浏览器上手动刷新页面。如果我离开它一个小时左右,4 个中的 3 个停止,只有一个是 运行ning。他们不会出错或打印任何它只是停止 运行ning 的东西。我在两台不同的笔记本电脑上试过,它们都有同样的问题。我也试过用 4 个不同的 mp.Process() 运行ning scrape_url 来做这件事,这也做同样的事情。有其他人 运行 解决这个问题还是我做错了什么?
一方面,Selenium 已经在创建一个进程,因此使用多线程而不是多处理要好得多,因为无论如何每个线程都会启动一个进程。此外,在 scrape_urls
中,在 driver = webdriver.Chrome(driver_dir)
语句之后,函数的其余部分应包含在 try/finally 语句中,其中 finally 块包含 driver.quit()
以确保驱动程序进程终止,无论是否有异常。现在您将离开所有驱动程序进程 运行.
您也可以考虑使用以下技术创建大小为 4 的线程池(或更小,具体取决于要处理的 URL 数量),但池中的每个线程都会自动重用已分配给的驱动程序它的线程,保存在线程本地存储中。您可能希望更改用于创建驱动程序的选项(当前为“无头”模式):
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from multiprocessing.pool import ThreadPool
import threading
import gc
threadLocal = threading.local()
class Driver:
def __init__(self):
options = webdriver.ChromeOptions()
options.add_argument("--headless")
options.add_experimental_option('excludeSwitches', ['enable-logging'])
self.driver = webdriver.Chrome(options=options)
def __del__(self):
self.driver.quit() # clean up driver when we are cleaned up
print('The driver has been "quitted".')
@classmethod
def create_driver(cls):
the_driver = getattr(threadLocal, 'the_driver', None)
if the_driver is None:
print('Creating new driver.')
the_driver = cls()
threadLocal.the_driver = the_driver
driver = the_driver.driver
the_driver = None
return driver
def scraper(url):
"""
This now scrapes a single URL.
"""
driver = Driver.create_driver()
driver.get(url)
element = WebDriverWait(driver, timeout=7).until(lambda d: d.find_element(by=By.CLASS_NAME, value="InterestingData"))
print("got data from: ", url)
return element.text
with open('array_of_urls', 'r') as infile:
urls = json.load(infile)
number_of_processes = min(4, len(urls))
with ThreadPool(processes=number_of_processes) as pool:
result_array = pool.map(scraper, urls)
# Must ensure drivers are quitted before threads are destroyed:
del threadLocal
# This should ensure that the __del__ method is run on class Driver:
gc.collect()
pool.close()
pool.join()