Python 多处理 class

Python multiprocessing a class

我正在尝试对 selenium 进行多进程处理,其中每个进程都由一个 selenium 驱动程序和一个会话生成(每个进程都与不同的帐户连接)。

我有一个要访问的 URL 列表。 每个 URL 需要被其中一个帐户访问一次(无论是哪个帐户)。

为了避免一些讨厌的全局变量管理,我尝试使用 multiprocessing.poolinitializer 使用 class 对象初始化每个进程。

在那之后,我不知道如何分配任务给进程知道每个进程使用的功能必须在class。

这是我正在尝试做的事情的简化版本:

from selenium import webdriver
import multiprocessing

account =  [{'account':1},{'account':2}]

class Collector():

    def __init__(self, account):

        self.account = account
        self.driver = webdriver.Chrome()

    def parse(self, item):

        self.driver.get(f"https://books.toscrape.com{item}")

if __name__ == '__main__':
    
    processes = 1
    pool = multiprocessing.Pool(processes,initializer=Collector,initargs=[account.pop()])

    items = ['/catalogue/a-light-in-the-attic_1000/index.html','/catalogue/tipping-the-velvet_999/index.html']
    
    pool.map(parse(), items, chunksize = 1)

    pool.close()
    pool.join() 

问题出现在 pool.map 行,子进程中没有对实例化对象的引用。 另一种方法是分发 URLs 并在初始化期间解析,但这会非常讨厌。

有办法实现吗?

我不确定这是否能解决您的问题。

如果您每个 URL 有一个帐户,那么您可以这样做:

from selenium import webdriver
from multiprocessing import Pool

items = ['/catalogue/a-light-in-the-attic_1000/index.html',
         '/catalogue/tipping-the-velvet_999/index.html']
accounts = [{'account': 1}, {'account': 2}]
baseurl = 'https://books.toscrape.com'

def process(i, a):
    print(f'Processing account {a}')
    options = webdriver.ChromeOptions()
    options.add_argument('--headless')

    with webdriver.Chrome(options=options) as driver:
        driver.get(f'{baseurl}{i}')


def main():
    with Pool() as pool:
        pool.starmap(process, zip(items, accounts))


if __name__ == '__main__':
    main()

如果帐户数量与 URL 的数量不匹配,您已经说过哪个帐户从哪个 URL 获取并不重要。因此,在这种情况下,您可以 select 随机使用的帐户 (random.choice())

由于 Chrome 启动了自己的进程,当多线程就足够时,确实没有必要使用多处理。我想提供一个更通用的解决方案来处理以下情况:您有 N 个要检索的 URL,其中 N 可能非常大,但您希望限制并发 Selenium 会话的数量 MAX_DRIVERS 其中 MAX_DRIVERS 是一个小得多的数字。因此,您只想为池中的每个线程创建一个驱动程序会话,并在必要时重新使用它。然后问题变成了当你完成池时在驱动程序上调用 quit 这样你就不会留下任何 Selenium 进程 运行.

以下代码使用每个线程唯一的threadlocal存储来存储每个池线程的当前驱动程序实例,并使用class析构函数调用驱动程序的quit 方法在 class 实例被销毁时:

from selenium import webdriver
from multiprocessing.pool import ThreadPool
import threading

items = ['/catalogue/a-light-in-the-attic_1000/index.html',
         '/catalogue/tipping-the-velvet_999/index.html']
accounts = [{'account': 1}, {'account': 2}]
baseurl = 'https://books.toscrape.com'

threadLocal = threading.local()

class Driver:
    def __init__(self):
        options = webdriver.ChromeOptions()
        options.add_argument("--headless")
        options.add_experimental_option('excludeSwitches', ['enable-logging'])
        self.driver = webdriver.Chrome(options=options)

    def __del__(self):
        self.driver.quit() # clean up driver when we are cleaned up
        print('The driver has been "quitted".')

    @classmethod
    def create_driver(cls):
        the_driver = getattr(threadLocal, 'the_driver', None)
        if the_driver is None:
            the_driver = cls()
            threadLocal.the_driver = the_driver
        return the_driver.driver


def process(i, a):
    print(f'Processing account {a}')
    driver = Driver.create_driver()
    driver.get(f'{baseurl}{i}')


def main():
    global threadLocal

    # We never want to create more than
    MAX_DRIVERS = 8 # Rather arbitrary
    POOL_SIZE = min(len(urls), MAX_DRIVERS)
    pool = ThreadPool(POOL_SIZE)
    pool.map(process, urls)
    # ensure the drivers are "quitted":
    del threadLocal
    import gc
    gc.collect() # a little extra insurance
    pool.close()
    pool.join()

if __name__ == '__main__':
    main()