Python 多处理 class
Python multiprocessing a class
我正在尝试对 selenium 进行多进程处理,其中每个进程都由一个 selenium 驱动程序和一个会话生成(每个进程都与不同的帐户连接)。
我有一个要访问的 URL 列表。
每个 URL 需要被其中一个帐户访问一次(无论是哪个帐户)。
为了避免一些讨厌的全局变量管理,我尝试使用 multiprocessing.pool 的 initializer
使用 class 对象初始化每个进程。
在那之后,我不知道如何分配任务给进程知道每个进程使用的功能必须在class。
这是我正在尝试做的事情的简化版本:
from selenium import webdriver
import multiprocessing
account = [{'account':1},{'account':2}]
class Collector():
def __init__(self, account):
self.account = account
self.driver = webdriver.Chrome()
def parse(self, item):
self.driver.get(f"https://books.toscrape.com{item}")
if __name__ == '__main__':
processes = 1
pool = multiprocessing.Pool(processes,initializer=Collector,initargs=[account.pop()])
items = ['/catalogue/a-light-in-the-attic_1000/index.html','/catalogue/tipping-the-velvet_999/index.html']
pool.map(parse(), items, chunksize = 1)
pool.close()
pool.join()
问题出现在 pool.map
行,子进程中没有对实例化对象的引用。
另一种方法是分发 URLs 并在初始化期间解析,但这会非常讨厌。
有办法实现吗?
我不确定这是否能解决您的问题。
如果您每个 URL 有一个帐户,那么您可以这样做:
from selenium import webdriver
from multiprocessing import Pool
items = ['/catalogue/a-light-in-the-attic_1000/index.html',
'/catalogue/tipping-the-velvet_999/index.html']
accounts = [{'account': 1}, {'account': 2}]
baseurl = 'https://books.toscrape.com'
def process(i, a):
print(f'Processing account {a}')
options = webdriver.ChromeOptions()
options.add_argument('--headless')
with webdriver.Chrome(options=options) as driver:
driver.get(f'{baseurl}{i}')
def main():
with Pool() as pool:
pool.starmap(process, zip(items, accounts))
if __name__ == '__main__':
main()
如果帐户数量与 URL 的数量不匹配,您已经说过哪个帐户从哪个 URL 获取并不重要。因此,在这种情况下,您可以 select 随机使用的帐户 (random.choice())
由于 Chrome 启动了自己的进程,当多线程就足够时,确实没有必要使用多处理。我想提供一个更通用的解决方案来处理以下情况:您有 N 个要检索的 URL,其中 N 可能非常大,但您希望限制并发 Selenium 会话的数量 MAX_DRIVERS 其中 MAX_DRIVERS 是一个小得多的数字。因此,您只想为池中的每个线程创建一个驱动程序会话,并在必要时重新使用它。然后问题变成了当你完成池时在驱动程序上调用 quit
这样你就不会留下任何 Selenium 进程 运行.
以下代码使用每个线程唯一的threadlocal
存储来存储每个池线程的当前驱动程序实例,并使用class析构函数调用驱动程序的quit
方法在 class 实例被销毁时:
from selenium import webdriver
from multiprocessing.pool import ThreadPool
import threading
items = ['/catalogue/a-light-in-the-attic_1000/index.html',
'/catalogue/tipping-the-velvet_999/index.html']
accounts = [{'account': 1}, {'account': 2}]
baseurl = 'https://books.toscrape.com'
threadLocal = threading.local()
class Driver:
def __init__(self):
options = webdriver.ChromeOptions()
options.add_argument("--headless")
options.add_experimental_option('excludeSwitches', ['enable-logging'])
self.driver = webdriver.Chrome(options=options)
def __del__(self):
self.driver.quit() # clean up driver when we are cleaned up
print('The driver has been "quitted".')
@classmethod
def create_driver(cls):
the_driver = getattr(threadLocal, 'the_driver', None)
if the_driver is None:
the_driver = cls()
threadLocal.the_driver = the_driver
return the_driver.driver
def process(i, a):
print(f'Processing account {a}')
driver = Driver.create_driver()
driver.get(f'{baseurl}{i}')
def main():
global threadLocal
# We never want to create more than
MAX_DRIVERS = 8 # Rather arbitrary
POOL_SIZE = min(len(urls), MAX_DRIVERS)
pool = ThreadPool(POOL_SIZE)
pool.map(process, urls)
# ensure the drivers are "quitted":
del threadLocal
import gc
gc.collect() # a little extra insurance
pool.close()
pool.join()
if __name__ == '__main__':
main()
我正在尝试对 selenium 进行多进程处理,其中每个进程都由一个 selenium 驱动程序和一个会话生成(每个进程都与不同的帐户连接)。
我有一个要访问的 URL 列表。 每个 URL 需要被其中一个帐户访问一次(无论是哪个帐户)。
为了避免一些讨厌的全局变量管理,我尝试使用 multiprocessing.pool 的 initializer
使用 class 对象初始化每个进程。
在那之后,我不知道如何分配任务给进程知道每个进程使用的功能必须在class。
这是我正在尝试做的事情的简化版本:
from selenium import webdriver
import multiprocessing
account = [{'account':1},{'account':2}]
class Collector():
def __init__(self, account):
self.account = account
self.driver = webdriver.Chrome()
def parse(self, item):
self.driver.get(f"https://books.toscrape.com{item}")
if __name__ == '__main__':
processes = 1
pool = multiprocessing.Pool(processes,initializer=Collector,initargs=[account.pop()])
items = ['/catalogue/a-light-in-the-attic_1000/index.html','/catalogue/tipping-the-velvet_999/index.html']
pool.map(parse(), items, chunksize = 1)
pool.close()
pool.join()
问题出现在 pool.map
行,子进程中没有对实例化对象的引用。
另一种方法是分发 URLs 并在初始化期间解析,但这会非常讨厌。
有办法实现吗?
我不确定这是否能解决您的问题。
如果您每个 URL 有一个帐户,那么您可以这样做:
from selenium import webdriver
from multiprocessing import Pool
items = ['/catalogue/a-light-in-the-attic_1000/index.html',
'/catalogue/tipping-the-velvet_999/index.html']
accounts = [{'account': 1}, {'account': 2}]
baseurl = 'https://books.toscrape.com'
def process(i, a):
print(f'Processing account {a}')
options = webdriver.ChromeOptions()
options.add_argument('--headless')
with webdriver.Chrome(options=options) as driver:
driver.get(f'{baseurl}{i}')
def main():
with Pool() as pool:
pool.starmap(process, zip(items, accounts))
if __name__ == '__main__':
main()
如果帐户数量与 URL 的数量不匹配,您已经说过哪个帐户从哪个 URL 获取并不重要。因此,在这种情况下,您可以 select 随机使用的帐户 (random.choice())
由于 Chrome 启动了自己的进程,当多线程就足够时,确实没有必要使用多处理。我想提供一个更通用的解决方案来处理以下情况:您有 N 个要检索的 URL,其中 N 可能非常大,但您希望限制并发 Selenium 会话的数量 MAX_DRIVERS 其中 MAX_DRIVERS 是一个小得多的数字。因此,您只想为池中的每个线程创建一个驱动程序会话,并在必要时重新使用它。然后问题变成了当你完成池时在驱动程序上调用 quit
这样你就不会留下任何 Selenium 进程 运行.
以下代码使用每个线程唯一的threadlocal
存储来存储每个池线程的当前驱动程序实例,并使用class析构函数调用驱动程序的quit
方法在 class 实例被销毁时:
from selenium import webdriver
from multiprocessing.pool import ThreadPool
import threading
items = ['/catalogue/a-light-in-the-attic_1000/index.html',
'/catalogue/tipping-the-velvet_999/index.html']
accounts = [{'account': 1}, {'account': 2}]
baseurl = 'https://books.toscrape.com'
threadLocal = threading.local()
class Driver:
def __init__(self):
options = webdriver.ChromeOptions()
options.add_argument("--headless")
options.add_experimental_option('excludeSwitches', ['enable-logging'])
self.driver = webdriver.Chrome(options=options)
def __del__(self):
self.driver.quit() # clean up driver when we are cleaned up
print('The driver has been "quitted".')
@classmethod
def create_driver(cls):
the_driver = getattr(threadLocal, 'the_driver', None)
if the_driver is None:
the_driver = cls()
threadLocal.the_driver = the_driver
return the_driver.driver
def process(i, a):
print(f'Processing account {a}')
driver = Driver.create_driver()
driver.get(f'{baseurl}{i}')
def main():
global threadLocal
# We never want to create more than
MAX_DRIVERS = 8 # Rather arbitrary
POOL_SIZE = min(len(urls), MAX_DRIVERS)
pool = ThreadPool(POOL_SIZE)
pool.map(process, urls)
# ensure the drivers are "quitted":
del threadLocal
import gc
gc.collect() # a little extra insurance
pool.close()
pool.join()
if __name__ == '__main__':
main()