在 Google 协作中使用 GPU 并行网络请求

Parallel web requests with GPU on Google Collab

我需要从 Web 服务获取大量产品(~25,000)的属性,这是一个对时间非常敏感的操作(理想情况下我需要它在几秒钟内执行)。我首先使用 for 循环对此进行编码作为概念证明,但需要 1.25 小时。我想矢量化此代码并在 Google Collab 上使用 GPU 并行执行 http 请求。我删除了许多不必要的细节,但重要的是要注意产品及其 Web 服务 URL 存储在 DataFrame 中。

在 GPU 上执行会更快吗?或者我应该在 CPU 上使用多个线程?

实现这个的最佳方法是什么?以及如何将并行进程的结果保存到结果 DataFrame (all_product_properties) 而不会 运行 进入并发问题?

每个产品都有我从 JSON 响应中获得的多个属性(键值对),但是 product_id 不包含在 JSON 响应中,所以我需要将 product_id 添加到 DataFrame。

#DataFrame containing string column of urls
urls = pd.DataFrame(["www.url1.com", "www.url2.com", ..., "www.url3.com"], columns=["url"])

#initialize empty dataframe to store properties for all products
all_product_properties = pd.DataFrame(columns=["product_id", "property_name", "property_value"])

for i in range(1, len(urls)):

  curr_url = urls.loc[i, "url"]

  try:
    http_response = requests.request("GET", curr_url)

    if http_response is not None:

      http_response_json = json.loads(http_response.text)

      #extract product properties from JSON response
      product_properties_json = http_response_json['product_properties']
      curr_product_properties_df = pd.json_normalize(product_properties_json)

      #add product id since it's not returned in the JSON
      curr_product_properties_df["product_id"] = i

      #save current product properties to DataFrame containing all product properties
      all_product_properties = pd.concat([all_product_properties, curr_product_properties_df ])

  except Exception as e:
    print(e)

GPU 在这里可能无济于事,因为它们旨在加速数字运算。但是,由于您正在尝试并行化 I/O 绑定的 HTTP 请求,因此您可以使用 Python 多线程(标准库的一部分)来减少所需的时间。

此外,在循环中连接 pandas 个数据帧是一个非常慢的操作(参见:)。您可以改为将输出附加到列表中,并且 运行 在循环结束后只是一个连接。

以下是我如何使用多线程实现您的代码:

# Use an empty list for storing loop output
all_product_properties = []

thread_local = threading.local()

def get_session():
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
    return thread_local.session


def download_site(url):
    session = get_session()

    try:
        with session.get(url) as response:
            if response is not None:
                http_response_json = json.loads(response.text)
                product_properties_json = http_response_json['product_properties']
                curr_product_properties_df = pd.json_normalize(product_properties_json)

                #add product id since it's not returned in the JSON
                curr_product_properties_df["product_id"] = i

                #save current product properties to DataFrame containing all product properties
                return curr_product_properties_df
                print(f"Read {len(response.content)} from {url}")
    except Exception as e:
        print(e)
        

def download_all_sites(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        all_product_properties = executor.map(download_site, sites)
    return all_product_properties


if __name__ == "__main__":
    # Store URLs as list, example below
    urls = ["https://www.jython.org", "http://olympus.realpython.org/dice"] * 10
    start_time = time.time()
    all_product_properties = download_all_sites(urls)
    all_product_properties = pd.concat(all_product_properties)
    
    duration = time.time() - start_time
    print(f"Downloaded {len(urls)} in {duration} seconds")

参考:Python 中关于多线程和多处理的这篇 RealPython 文章:https://realpython.com/python-concurrency/