Python 使用多线程触发动态 url
Python Fire dynamic urls using multithreading
我是 Python-Threading 的新手,我看过很多帖子,但我真的不明白如何使用它。但是我试图完成我的任务,我想检查我是否以正确的方法完成它。
任务是:
读取包含大约 20K 条记录的大型 CSV,从每条记录中获取 ID 并为 CSV 的每条记录触发 HTTP API 调用。
t1 = time.time()
file_data_obj = csv.DictReader(open(file_path, 'rU'))
threads = []
for record in file_data_obj:
apiurl = https://www.api-server.com?id=record.get("acc_id", "")
thread = threading.Thread(target=requests.get, args=(apiurl,))
thread.start()
threads.append(thread)
t2 = time.time()
for thread in threads:
thread.join()
print("Total time required to process a file - {} Secs".format(t2-t1))
- 有20K条记录,是否会开启20K个线程?或者
OS
/Python
会处理吗?如果是,我们可以限制它吗?
- 如何收集
requests.get
返回的响应?
- t2 - t1 真的会给 mw 处理整个文件所需的时间吗?
As there are 20K records, would it start 20K threads? OR OS/Python will handle it? If yes, can we restrict it?
是 - 每次迭代都会启动一个线程。线程的最大数量取决于您的 OS
.
How can I grab the response returned by requests.get?
如果您只想使用 threading
模块,则必须使用 Queue
。 Threads
return None
按照设计,因此您必须在 Thread
和您 main
循环之间实现一条通信线路。
from queue import Queue
from threading import Thread
import time
# A thread that produces data
q = Queue()
def return_get(q, apiurl):
q.put(requests.get(apiurl)
for record in file_data_obj:
apiurl = https://www.api-server.com?id=record.get("acc_id", "")
t = threading.Thread(target=return_get, args=(q, apiurl))
t.start()
threads.append(t)
for thread in threads:
thread.join()
while not q.empty:
r = q.get() # Fetches the first item on the queue
print(r.text)
另一种方法是使用工作池。
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
import urllib.request
threads = []
pool = ThreadPoolExecutor(10)
# Submit work to the pool
for record in file_data_obj:
apiurl = https://www.api-server.com?id=record.get("acc_id", "")
t = pool.submit(fetch_url, 'http://www.python.org')
threads.append(t)
for t in threads:
print(t.result())
您可以使用 ThreadPoolExecutor
检索单个页面并报告 URL 和内容
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
创建具有 N 个 worker 的池执行器
with concurrent.futures.ThreadPoolExecutor(max_workers=N_workers) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
我是 Python-Threading 的新手,我看过很多帖子,但我真的不明白如何使用它。但是我试图完成我的任务,我想检查我是否以正确的方法完成它。
任务是: 读取包含大约 20K 条记录的大型 CSV,从每条记录中获取 ID 并为 CSV 的每条记录触发 HTTP API 调用。
t1 = time.time()
file_data_obj = csv.DictReader(open(file_path, 'rU'))
threads = []
for record in file_data_obj:
apiurl = https://www.api-server.com?id=record.get("acc_id", "")
thread = threading.Thread(target=requests.get, args=(apiurl,))
thread.start()
threads.append(thread)
t2 = time.time()
for thread in threads:
thread.join()
print("Total time required to process a file - {} Secs".format(t2-t1))
- 有20K条记录,是否会开启20K个线程?或者
OS
/Python
会处理吗?如果是,我们可以限制它吗? - 如何收集
requests.get
返回的响应? - t2 - t1 真的会给 mw 处理整个文件所需的时间吗?
As there are 20K records, would it start 20K threads? OR OS/Python will handle it? If yes, can we restrict it?
是 - 每次迭代都会启动一个线程。线程的最大数量取决于您的 OS
.
How can I grab the response returned by requests.get?
如果您只想使用 threading
模块,则必须使用 Queue
。 Threads
return None
按照设计,因此您必须在 Thread
和您 main
循环之间实现一条通信线路。
from queue import Queue
from threading import Thread
import time
# A thread that produces data
q = Queue()
def return_get(q, apiurl):
q.put(requests.get(apiurl)
for record in file_data_obj:
apiurl = https://www.api-server.com?id=record.get("acc_id", "")
t = threading.Thread(target=return_get, args=(q, apiurl))
t.start()
threads.append(t)
for thread in threads:
thread.join()
while not q.empty:
r = q.get() # Fetches the first item on the queue
print(r.text)
另一种方法是使用工作池。
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
import urllib.request
threads = []
pool = ThreadPoolExecutor(10)
# Submit work to the pool
for record in file_data_obj:
apiurl = https://www.api-server.com?id=record.get("acc_id", "")
t = pool.submit(fetch_url, 'http://www.python.org')
threads.append(t)
for t in threads:
print(t.result())
您可以使用 ThreadPoolExecutor
检索单个页面并报告 URL 和内容
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
创建具有 N 个 worker 的池执行器
with concurrent.futures.ThreadPoolExecutor(max_workers=N_workers) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))