导入的包在并行计算时不被识别?

Packages that are imported are not recognized during parallel computing?

我 运行 函数 get_contentmultiprocess.Pool 并行设置。然后它抛出一个错误NameError: name 'session' is not defined。很明显,我用session = requests.Session() 定义了它。能否详细说明一下这个问题?

import requests, os
from bs4 import BeautifulSoup
from multiprocess import Pool, freeze_support
core = os.cpu_count()
session = requests.Session() 
headers = {'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:78.0) Gecko/20100101 Firefox/78.0'}

links = ['https://www.investopedia.com/terms/1/0x-protocol.asp',
         'https://www.investopedia.com/terms/1/1-10net30.asp',
         'https://www.investopedia.com/terms/1/10-k.asp',
         'https://www.investopedia.com/terms/1/10k-wrap.asp',
         'https://www.investopedia.com/terms/1/10q.asp']

############ Get content of a word
def get_content(l):
    r = session.get(l, headers = headers)
    soup = BeautifulSoup(r.content, 'html.parser')
    entry_name = soup.select_one('#article-heading_3-0').contents[0]
    main = soup.select('.comp.article-body.mntl-block')[0]
    content = entry_name + '\n' + '<link href="investopedia.css" rel="stylesheet"/>' + '\n' + str(main) + '\n</>\n'
    return(content)

############ Parallel computing
if __name__=="__main__":
    P = Pool(processes = core)   
    content_list = P.map(get_content, links)
    content_all = ''.join(content_list)    
    freeze_support()

首先,你的导入语句不正确,应该是:

from multiprocessing import Pool, freeze_support

(你有 from multiprocess ...,所以我完全不知道它是如何 运行)

使用正确的 import 语句,代码为我运行,但它不是你想的那样!通过对 freeze_support 的调用,我推测您是 Windows 下的 运行。在该平台下,通过调用系统函数 spawn 来调用新进程,这导致整个程序从最顶层开始执行。这就是为什么创建新进程的代码必须位于由 if __name__ == '__main__': 控制的块中的原因。如果不是,那么您新创建的进程将在永无止境的递归循环中重新执行刚刚创建它们的代码,从而永远产生新进程。

这意味着每个进程都在重新创建自己的 Session 实例,因为以下语句在全局范围内:

session = requests.Session()

因此,您无法为尝试检索的多个 URL 重复使用相同的 Session 实例,这并没有真正的好处。为了重用相同的 Session 实例,您必须使用会话对象初始化多处理池本身,以便它驻留在共享内存中并对所有进程可见。您还应该只在全局范围内保留最少的可执行代码:

import requests, os
from bs4 import BeautifulSoup
from multiprocessing import Pool, freeze_support

headers = {'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:78.0) Gecko/20100101 Firefox/78.0'}

def init_pool(s):
    global session
    session = s


############ Get content of a word
def get_content(l):
    r = session.get(l, headers = headers)
    soup = BeautifulSoup(r.content, 'html.parser')
    entry_name = soup.select_one('#article-heading_3-0').contents[0]
    main = soup.select('.comp.article-body.mntl-block')[0]
    content = entry_name + '\n' + '<link href="investopedia.css" rel="stylesheet"/>' + '\n' + str(main) + '\n</>\n'
    return(content)

############ Parallel computing
if __name__=="__main__":
    core = os.cpu_count()
    session = requests.Session()

    links = ['https://www.investopedia.com/terms/1/0x-protocol.asp',
             'https://www.investopedia.com/terms/1/1-10net30.asp',
             'https://www.investopedia.com/terms/1/10-k.asp',
             'https://www.investopedia.com/terms/1/10k-wrap.asp',
             'https://www.investopedia.com/terms/1/10q.asp']

    p = Pool(processes = core, initializer=init_pool, initargs=(session,))
    content_list = p.map(get_content, links)
    content_all = ''.join(content_list)
    print(content_all)
    freeze_support()

但实际上您的代码大部分时间都花在等待检索 URL 上,只花一点 CPU 时间来处理返回的 HTML。这可能是使用多线程而不是多处理的一个很好的候选者。要使用多线程,您需要对 原始 代码进行的唯一更改是 (1) 删除对 freeze_support 的所有引用(除非您计划创建一个 exe 文件)并更改一个 import 语句:

from multiprocessing.dummy import Pool

此外,在确定要使用的线程数时,您不应该受到 CPU 内核数量的限制(尽管有一些您不想超过的最大值):

import requests, os
from bs4 import BeautifulSoup
from multiprocessing.dummy import Pool
session = requests.Session()
headers = {'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:78.0) Gecko/20100101 Firefox/78.0'}

links = ['https://www.investopedia.com/terms/1/0x-protocol.asp',
         'https://www.investopedia.com/terms/1/1-10net30.asp',
         'https://www.investopedia.com/terms/1/10-k.asp',
         'https://www.investopedia.com/terms/1/10k-wrap.asp',
         'https://www.investopedia.com/terms/1/10q.asp']

############ Get content of a word
def get_content(l):
    r = session.get(l, headers = headers)
    soup = BeautifulSoup(r.content, 'html.parser')
    entry_name = soup.select_one('#article-heading_3-0').contents[0]
    main = soup.select('.comp.article-body.mntl-block')[0]
    content = entry_name + '\n' + '<link href="investopedia.css" rel="stylesheet"/>' + '\n' + str(main) + '\n</>\n'
    return(content)

############ Concurrent computing
if __name__=="__main__":
    # max of 25 is arbitrary; we do not want to appear to be a denial of service attack
    P = Pool(processes = min(len(links), 25))
    content_list = P.map(get_content, links)
    content_all = ''.join(content_list)
    print(content_all)

最后,您可以结合线程池和多处理池,使用后者来处理处理的 CPU 密集部分:

import requests, os
from bs4 import BeautifulSoup
from multiprocessing.pool import ThreadPool
from multiprocessing.pool import Pool
import os
from functools import partial


session = requests.Session()
headers = {'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:78.0) Gecko/20100101 Firefox/78.0'}

links = ['https://www.investopedia.com/terms/1/0x-protocol.asp',
         'https://www.investopedia.com/terms/1/1-10net30.asp',
         'https://www.investopedia.com/terms/1/10-k.asp',
         'https://www.investopedia.com/terms/1/10k-wrap.asp',
         'https://www.investopedia.com/terms/1/10q.asp']

############ Get content of a word
def get_content(process_pool, l):
    r = session.get(l, headers = headers)
    return process_pool.apply(process_content, args=(r.content,))

def process_content(content):
    soup = BeautifulSoup(content, 'html.parser')
    entry_name = soup.select_one('#article-heading_3-0').contents[0]
    main = soup.select('.comp.article-body.mntl-block')[0]
    content = entry_name + '\n' + '<link href="investopedia.css" rel="stylesheet"/>' + '\n' + str(main) + '\n</>\n'
    return(content)


############ Parallel computing
if __name__=="__main__":
    process_pool = Pool(processes = min(len(links), os.cpu_count()))
    thread_pool = ThreadPool(processes = min(len(links), 25))
    content_list = thread_pool.map(partial(get_content, process_pool), links)
    content_all = ''.join(content_list)
    print(content_all)