Python 多线程方法
Python multi-threading method
我听说 Python 多线程有点 tricky,我不确定什么是实现我需要的最好方法。假设我有一个名为 IO_intensive_function
的函数,它执行一些 API 调用,可能需要一段时间才能得到响应。
假设排队作业的过程可能如下所示:
import thread
for job_args in jobs:
thread.start_new_thread(IO_intense_function, (job_args))
现在 IO_intense_function
是否会在后台执行其任务并允许我排队等待更多作业?
我也查看了 this question,似乎方法就是执行以下操作:
from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(2)
results = pool.map(IO_intensive_function, jobs)
由于我不需要这些任务相互通信,唯一的目标是尽快发送我的 API 请求。这是最有效的方法吗?谢谢。
编辑:
我发出 API 请求的方式是通过 Thrift 服务。
对于网络 API 请求,您可以使用 asyncio。查看这篇文章 https://realpython.com/python-concurrency/#asyncio-version 以了解如何实施它的示例。
我最近不得不创建代码来做类似的事情。我试图在下面使其通用。注意我是一个新手编码器,所以请原谅我的不雅。然而,您可能会发现有价值的是我发现有必要嵌入的一些错误处理以捕获断开连接等。
我还发现以线程方式执行 json 处理很有价值。您拥有为您工作的线程,那么当您可以并行提取信息时,为什么还要再次“串行”处理一个处理步骤。
我可能在使其通用时编码错误。请不要犹豫,询问后续问题,我会澄清。
import requests
from multiprocessing.dummy import Pool as ThreadPool
from src_code.config import Config
with open(Config.API_PATH + '/api_security_key.pem') as f:
my_key = f.read().rstrip("\n")
f.close()
base_url = "https://api.my_api_destination.com/v1"
headers = {"Authorization": "Bearer %s" % my_key}
itm = list()
itm.append(base_url)
itm.append(headers)
def call_API(call_var):
base_url = call_var[0]
headers = call_var[1]
call_specific_tag = call_var[2]
endpoint = f'/api_path/{call_specific_tag}'
connection_tries = 0
for i in range(3):
try:
dat = requests.get((base_url + endpoint), headers=headers).json()
except:
connection_tries += 1
print(f'Call for {api_specific_tag} failed after {i} attempt(s). Pausing for 240 seconds.')
time.sleep(240)
else:
break
tag = list()
vars_to_capture_01 = list()
vars_to_capture_02 = list()
connection_tries = 0
try:
if 'record_id' in dat:
vars_to_capture_01.append(dat['record_id'])
vars_to_capture_02.append(dat['second_item_of_interest'])
else:
vars_to_capture_01.append(call_specific_tag)
print(f'Call specific tag {call_specific_tag} is unavailable. Successful pull.')
vars_to_capture_02.append(-1)
except:
print(f'{call_specific_tag} is unavailable. Unsuccessful pull.')
vars_to_capture_01.append(call_specific_tag)
vars_to_capture_02.append(-1)
time.sleep(240)
pack = list()
pack.append(vars_to_capture_01)
pack.append(vars_to_capture_02)
return pack
vars_to_capture_01 = list()
vars_to_capture_02 = list()
i = 0
max_i = len(all_tags)
while i < max_i:
ind_rng = range(i, min((i + 10), (max_i)), 1)
itm_lst = (itm.copy())
call_var = [itm_lst + [all_tags[q]] for q in ind_rng]
#packed = call_API(call_var[0]) # for testing of function without pooling
pool = ThreadPool(len(call_var))
packed = pool.map(call_API, call_var)
pool.close()
pool.join()
for pack in packed:
try:
vars_to_capture_01.append(pack[0][0])
except:
print(f'Unpacking error for {all_tags[i]}.')
vars_to_capture_02.append(pack[1][0])
我听说 Python 多线程有点 tricky,我不确定什么是实现我需要的最好方法。假设我有一个名为 IO_intensive_function
的函数,它执行一些 API 调用,可能需要一段时间才能得到响应。
假设排队作业的过程可能如下所示:
import thread
for job_args in jobs:
thread.start_new_thread(IO_intense_function, (job_args))
现在 IO_intense_function
是否会在后台执行其任务并允许我排队等待更多作业?
我也查看了 this question,似乎方法就是执行以下操作:
from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(2)
results = pool.map(IO_intensive_function, jobs)
由于我不需要这些任务相互通信,唯一的目标是尽快发送我的 API 请求。这是最有效的方法吗?谢谢。
编辑: 我发出 API 请求的方式是通过 Thrift 服务。
对于网络 API 请求,您可以使用 asyncio。查看这篇文章 https://realpython.com/python-concurrency/#asyncio-version 以了解如何实施它的示例。
我最近不得不创建代码来做类似的事情。我试图在下面使其通用。注意我是一个新手编码器,所以请原谅我的不雅。然而,您可能会发现有价值的是我发现有必要嵌入的一些错误处理以捕获断开连接等。
我还发现以线程方式执行 json 处理很有价值。您拥有为您工作的线程,那么当您可以并行提取信息时,为什么还要再次“串行”处理一个处理步骤。
我可能在使其通用时编码错误。请不要犹豫,询问后续问题,我会澄清。
import requests
from multiprocessing.dummy import Pool as ThreadPool
from src_code.config import Config
with open(Config.API_PATH + '/api_security_key.pem') as f:
my_key = f.read().rstrip("\n")
f.close()
base_url = "https://api.my_api_destination.com/v1"
headers = {"Authorization": "Bearer %s" % my_key}
itm = list()
itm.append(base_url)
itm.append(headers)
def call_API(call_var):
base_url = call_var[0]
headers = call_var[1]
call_specific_tag = call_var[2]
endpoint = f'/api_path/{call_specific_tag}'
connection_tries = 0
for i in range(3):
try:
dat = requests.get((base_url + endpoint), headers=headers).json()
except:
connection_tries += 1
print(f'Call for {api_specific_tag} failed after {i} attempt(s). Pausing for 240 seconds.')
time.sleep(240)
else:
break
tag = list()
vars_to_capture_01 = list()
vars_to_capture_02 = list()
connection_tries = 0
try:
if 'record_id' in dat:
vars_to_capture_01.append(dat['record_id'])
vars_to_capture_02.append(dat['second_item_of_interest'])
else:
vars_to_capture_01.append(call_specific_tag)
print(f'Call specific tag {call_specific_tag} is unavailable. Successful pull.')
vars_to_capture_02.append(-1)
except:
print(f'{call_specific_tag} is unavailable. Unsuccessful pull.')
vars_to_capture_01.append(call_specific_tag)
vars_to_capture_02.append(-1)
time.sleep(240)
pack = list()
pack.append(vars_to_capture_01)
pack.append(vars_to_capture_02)
return pack
vars_to_capture_01 = list()
vars_to_capture_02 = list()
i = 0
max_i = len(all_tags)
while i < max_i:
ind_rng = range(i, min((i + 10), (max_i)), 1)
itm_lst = (itm.copy())
call_var = [itm_lst + [all_tags[q]] for q in ind_rng]
#packed = call_API(call_var[0]) # for testing of function without pooling
pool = ThreadPool(len(call_var))
packed = pool.map(call_API, call_var)
pool.close()
pool.join()
for pack in packed:
try:
vars_to_capture_01.append(pack[0][0])
except:
print(f'Unpacking error for {all_tags[i]}.')
vars_to_capture_02.append(pack[1][0])