Concurrent.futures + requests_html 的 render() = "There is no current event loop in thread 'ThreadPoolExecutor-0_0'."

Concurrent.futures + requests_html's render() = "There is no current event loop in thread 'ThreadPoolExecutor-0_0'."

我正在使用 请求 HTML 在页面上呈现 javascript。我还使用 concurrent.futures 来加快进程。在添加以下行之前,我的代码运行良好:

response.html.render(timeout=60, sleep=1, wait=3, retries=10)

我收到错误:

response.html.render(timeout=60, sleep=1, wait=3, retries=10)   

File "C:\Users\Ze\Anaconda3\lib\site-packages\requests_html.py", line 586, in render self.browser = self.session.browser # Automatically create a event loop and browser
File "C:\Users\Ze\Anaconda3\lib\site-packages\requests_html.py", line 727, in browser self.loop = asyncio.get_event_loop()
File "C:\Users\Ze\Anaconda3\lib\asyncio\events.py", line 639, in get_event_loop raise RuntimeError('There is no current event loop in thread %r.' RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-0_0'.

如果我将有问题的行移动到下面的部分中,它会再次工作,但渲染不会并行发生,对吗?

for result in concurrent.futures.as_completed(futures):
    result = result.result()

导致问题的原因是什么?我从未使用过异步。我必须这样做吗?容易实现吗?

非常感谢!

代码:

def load_page_and_extract_items(url):
    response = session.get(url, headers=get_headers())

    # render javascript
    response.html.render(timeout=60, wait=3)
    source = BeautifulSoup(response.html.raw_html, 'lxml')


def get_pages(remaining_urls):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # for each of 60 possible pages
        for current_page_number in range(60):
            futures = [executor.submit(load_page_and_extract_items, url) for url in remaining_urls]

            for result in concurrent.futures.as_completed(futures):
                result = result.result()

def main():
    get_pages(urls)

这并没有直接回答问题,而是展示了一种在我的测试中表现良好的多线程网络抓取技术。它使用原始问题中所述的 URL 并搜索 [可能] 包含 HREF 的某些标签,然后处理这些 URL。一般的想法是我创建一个会话池,每个线程从池(一个队列)中获取一个会话对象,使用它然后将它放回队列中,从而使其可供其他线程使用。

from requests_html import HTMLSession
import concurrent.futures
import queue

QUEUE = queue.Queue()


def makeSessions(n=4):
    for _ in range(n):
        QUEUE.put(HTMLSession())


def cleanup():
    while True:
        try:
            getSession(False).close()
        except queue.Empty:
            break


def getSession(block=True):
    return QUEUE.get(block=block)


def freeSession(session):
    if isinstance(session, HTMLSession):
        QUEUE.put(session)


def getURLs():
    urls = []
    session = getSession()
    try:
        response = session.get('https://www.aliexpress.com')
        response.raise_for_status()
        response.html.render()
        for a in response.html.xpath('//dt[@class="cate-name"]/span/a'):
            if 'href' in a.attrs:
                urls.append(a.attrs['href'])
    finally:
        freeSession(session)
    return urls


def processURL(url):
    print(url)
    session = getSession()
    try:
        response = session.get(url)
        response.raise_for_status()
        response.html.render()
    finally:
        freeSession(session)


if __name__ == '__main__':
    try:
        makeSessions()
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = [executor.submit(processURL, url) for url in getURLs()]
            for _ in concurrent.futures.as_completed(futures):
                pass
    finally:
        cleanup()