在 Google 协作中使用 GPU 并行网络请求
Parallel web requests with GPU on Google Collab
我需要从 Web 服务获取大量产品(~25,000)的属性,这是一个对时间非常敏感的操作(理想情况下我需要它在几秒钟内执行)。我首先使用 for 循环对此进行编码作为概念证明,但需要 1.25 小时。我想矢量化此代码并在 Google Collab 上使用 GPU 并行执行 http 请求。我删除了许多不必要的细节,但重要的是要注意产品及其 Web 服务 URL 存储在 DataFrame 中。
在 GPU 上执行会更快吗?或者我应该在 CPU 上使用多个线程?
实现这个的最佳方法是什么?以及如何将并行进程的结果保存到结果 DataFrame (all_product_properties) 而不会 运行 进入并发问题?
每个产品都有我从 JSON 响应中获得的多个属性(键值对),但是 product_id 不包含在 JSON 响应中,所以我需要将 product_id 添加到 DataFrame。
#DataFrame containing string column of urls
urls = pd.DataFrame(["www.url1.com", "www.url2.com", ..., "www.url3.com"], columns=["url"])
#initialize empty dataframe to store properties for all products
all_product_properties = pd.DataFrame(columns=["product_id", "property_name", "property_value"])
for i in range(1, len(urls)):
curr_url = urls.loc[i, "url"]
try:
http_response = requests.request("GET", curr_url)
if http_response is not None:
http_response_json = json.loads(http_response.text)
#extract product properties from JSON response
product_properties_json = http_response_json['product_properties']
curr_product_properties_df = pd.json_normalize(product_properties_json)
#add product id since it's not returned in the JSON
curr_product_properties_df["product_id"] = i
#save current product properties to DataFrame containing all product properties
all_product_properties = pd.concat([all_product_properties, curr_product_properties_df ])
except Exception as e:
print(e)
GPU 在这里可能无济于事,因为它们旨在加速数字运算。但是,由于您正在尝试并行化 I/O 绑定的 HTTP 请求,因此您可以使用 Python 多线程(标准库的一部分)来减少所需的时间。
此外,在循环中连接 pandas 个数据帧是一个非常慢的操作(参见:)。您可以改为将输出附加到列表中,并且 运行 在循环结束后只是一个连接。
以下是我如何使用多线程实现您的代码:
# Use an empty list for storing loop output
all_product_properties = []
thread_local = threading.local()
def get_session():
if not hasattr(thread_local, "session"):
thread_local.session = requests.Session()
return thread_local.session
def download_site(url):
session = get_session()
try:
with session.get(url) as response:
if response is not None:
http_response_json = json.loads(response.text)
product_properties_json = http_response_json['product_properties']
curr_product_properties_df = pd.json_normalize(product_properties_json)
#add product id since it's not returned in the JSON
curr_product_properties_df["product_id"] = i
#save current product properties to DataFrame containing all product properties
return curr_product_properties_df
print(f"Read {len(response.content)} from {url}")
except Exception as e:
print(e)
def download_all_sites(sites):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
all_product_properties = executor.map(download_site, sites)
return all_product_properties
if __name__ == "__main__":
# Store URLs as list, example below
urls = ["https://www.jython.org", "http://olympus.realpython.org/dice"] * 10
start_time = time.time()
all_product_properties = download_all_sites(urls)
all_product_properties = pd.concat(all_product_properties)
duration = time.time() - start_time
print(f"Downloaded {len(urls)} in {duration} seconds")
参考:Python 中关于多线程和多处理的这篇 RealPython 文章:https://realpython.com/python-concurrency/
我需要从 Web 服务获取大量产品(~25,000)的属性,这是一个对时间非常敏感的操作(理想情况下我需要它在几秒钟内执行)。我首先使用 for 循环对此进行编码作为概念证明,但需要 1.25 小时。我想矢量化此代码并在 Google Collab 上使用 GPU 并行执行 http 请求。我删除了许多不必要的细节,但重要的是要注意产品及其 Web 服务 URL 存储在 DataFrame 中。
在 GPU 上执行会更快吗?或者我应该在 CPU 上使用多个线程?
实现这个的最佳方法是什么?以及如何将并行进程的结果保存到结果 DataFrame (all_product_properties) 而不会 运行 进入并发问题?
每个产品都有我从 JSON 响应中获得的多个属性(键值对),但是 product_id 不包含在 JSON 响应中,所以我需要将 product_id 添加到 DataFrame。
#DataFrame containing string column of urls
urls = pd.DataFrame(["www.url1.com", "www.url2.com", ..., "www.url3.com"], columns=["url"])
#initialize empty dataframe to store properties for all products
all_product_properties = pd.DataFrame(columns=["product_id", "property_name", "property_value"])
for i in range(1, len(urls)):
curr_url = urls.loc[i, "url"]
try:
http_response = requests.request("GET", curr_url)
if http_response is not None:
http_response_json = json.loads(http_response.text)
#extract product properties from JSON response
product_properties_json = http_response_json['product_properties']
curr_product_properties_df = pd.json_normalize(product_properties_json)
#add product id since it's not returned in the JSON
curr_product_properties_df["product_id"] = i
#save current product properties to DataFrame containing all product properties
all_product_properties = pd.concat([all_product_properties, curr_product_properties_df ])
except Exception as e:
print(e)
GPU 在这里可能无济于事,因为它们旨在加速数字运算。但是,由于您正在尝试并行化 I/O 绑定的 HTTP 请求,因此您可以使用 Python 多线程(标准库的一部分)来减少所需的时间。
此外,在循环中连接 pandas 个数据帧是一个非常慢的操作(参见:
以下是我如何使用多线程实现您的代码:
# Use an empty list for storing loop output
all_product_properties = []
thread_local = threading.local()
def get_session():
if not hasattr(thread_local, "session"):
thread_local.session = requests.Session()
return thread_local.session
def download_site(url):
session = get_session()
try:
with session.get(url) as response:
if response is not None:
http_response_json = json.loads(response.text)
product_properties_json = http_response_json['product_properties']
curr_product_properties_df = pd.json_normalize(product_properties_json)
#add product id since it's not returned in the JSON
curr_product_properties_df["product_id"] = i
#save current product properties to DataFrame containing all product properties
return curr_product_properties_df
print(f"Read {len(response.content)} from {url}")
except Exception as e:
print(e)
def download_all_sites(sites):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
all_product_properties = executor.map(download_site, sites)
return all_product_properties
if __name__ == "__main__":
# Store URLs as list, example below
urls = ["https://www.jython.org", "http://olympus.realpython.org/dice"] * 10
start_time = time.time()
all_product_properties = download_all_sites(urls)
all_product_properties = pd.concat(all_product_properties)
duration = time.time() - start_time
print(f"Downloaded {len(urls)} in {duration} seconds")
参考:Python 中关于多线程和多处理的这篇 RealPython 文章:https://realpython.com/python-concurrency/