如何通过 concurrent.futures 在 class 运行 中避免死锁
How to avoid deadlock inside a class running by concurrent.futures
我有一个 class 正在执行一些 I/O 绑定任务,我决定通过使用 python 并发来加快这些任务。
因此,无需进一步讨论,重要的代码部分是(删除了不必要的部分):
class toolbox:
def __init__(self, username, password, fid, settings):
#...
self.access_token = ""
self.login_data = {
"username": username,
"password": password,
"sfid" : fid
}
self.request_headers = {
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.141 Safari/537.36",
"Accept":"application/json, text/plain, */*",
"Authorization":""
}
self.request_data = {}
def new_session(self, n=0):
session = requests.Session()
#...
if token:
self.access_token = "Bearer "+token
self.request_headers["Authorization"] = self.access_token
dreq = session.get(self.data_url, headers=self.request_headers)
if dreq.status_code==200:
self.request_data = dict( dreq.json() )
print("[ ALL SET! ]", flush=True)
else:
print(f"\n{R}[!]{end} Something went wrong, can't grab data required to start searching!", flush=True)
if n!=3:
print("[~] Trying to relogin and grab data again!", flush=True)
self.new_session(n+1)
else:
input(f"{R}[!]{end} Maximum tries exceed! There's another thing wrong can't find it! Quiting...")
sys.exit(0)
#...
#...
def check_number(self, row):
#...
if not self.Cookies:
print(f"{R}[!]{end} Creating a new session for number {number}", flush=True)
self.new_session()
#...
while True:
try:
req = requests.post(
self.search_url,
headers = self.request_headers,
json = self.request_data,
cookies = self.Cookies,
timeout = self.settings["request_timeout"]
)
if req.status_code!=200:
#...
if n==1:
n=0
print(f"{R}[!]{end} Creating a new session for number {number}", flush=True)
self.new_session()
else:
n+=1
if req.status_code!=401:
raise requests.exceptions.RequestException
#...
except requests.exceptions.RequestException as e:
#...
所以函数 check_number 在进行抓取时,如果它得到 401 状态代码,它将登录以获取新会话。那是因为 API 会话需要在每个特定时间段更新!
好的,当线程开始时问题就来了,这是在下面的函数中(在同一个 class 中):
def check_file(self, input_file):
#...
if self.settings["threaded"]:
#...
with concurrent.futures.ThreadPoolExecutor(max_workers=self.settings["threads"]) as executor:
#...
for num in executor.map(
self.check_number, manager.data_rows,
timeout=self.settings["running_timeout"],
chunksize=self.settings["save_interval"] ):
if not self.Cookies:
self.new_session()
if num:
#...
time.sleep(self.settings["throttle"])
else:
start_time = time.time()
for row in manager.data_rows:
#...
#...
程序会运行顺利地没有任何问题,直到会话结束,所有线程将同时尝试更新会话,这将造成死锁。所以问题是我如何重构代码以便在会话结束时我可以立即为所有线程更新会话而不会导致死锁?另请注意,在此程序中,当您注意到“else”语句时,用户可以决定 运行 使用设置取消线程化。
我搜索并尝试了很多东西,但我对线程不是很好,我希望任何人都可以帮助我,因为这是我工作中的问题,而不是业余项目。
提前致谢
我通过制作一个线程函数解决了这个问题,该函数每 5 秒检查一次会话是否仍然可用,如果不可用,它将更新会话,同时所有 运行ning 线程等待并每 5 秒重试一次。
这背后的代码:
class 外的装饰器到 运行 线程中的函数:
def threaded(fn):
def run(*k, **kw):
threaded_function = threading.Thread(target=fn, args=k, kwargs=kw)
threaded_function.start()
return threaded_function
return run
保持会话可行的线程函数:
@threaded
def session_keeper(self):
# This function is to make all threads use the same session to prevent deadlocks
while self.keep_threading:
time.sleep(5)
if not self.Cookies:
print(f"{G}{Bold}[SESSION]{end} Creating a new session", flush=True)
self.new_session()
continue
else:
dreq = requests.get(self.data_url, cookies = self.Cookies, headers=self.request_headers)
if dreq.status_code!=200:
print(f"{R}{Bold}[!]{end} Session ended, creating new one...", flush=True)
self.new_session()
continue
修改“check_number”函数来执行此操作:
while True:
try:
req = requests.post(
self.search_url,
headers = self.request_headers,
json = self.request_data,
cookies = self.Cookies,
timeout = self.settings["request_timeout"]
)
if req.status_code!=200:
#...
time.sleep(5)
if req.status_code!=401:
raise requests.exceptions.RequestException
else:
continue
我希望这能对某人有所帮助:)
我有一个 class 正在执行一些 I/O 绑定任务,我决定通过使用 python 并发来加快这些任务。 因此,无需进一步讨论,重要的代码部分是(删除了不必要的部分):
class toolbox:
def __init__(self, username, password, fid, settings):
#...
self.access_token = ""
self.login_data = {
"username": username,
"password": password,
"sfid" : fid
}
self.request_headers = {
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.141 Safari/537.36",
"Accept":"application/json, text/plain, */*",
"Authorization":""
}
self.request_data = {}
def new_session(self, n=0):
session = requests.Session()
#...
if token:
self.access_token = "Bearer "+token
self.request_headers["Authorization"] = self.access_token
dreq = session.get(self.data_url, headers=self.request_headers)
if dreq.status_code==200:
self.request_data = dict( dreq.json() )
print("[ ALL SET! ]", flush=True)
else:
print(f"\n{R}[!]{end} Something went wrong, can't grab data required to start searching!", flush=True)
if n!=3:
print("[~] Trying to relogin and grab data again!", flush=True)
self.new_session(n+1)
else:
input(f"{R}[!]{end} Maximum tries exceed! There's another thing wrong can't find it! Quiting...")
sys.exit(0)
#...
#...
def check_number(self, row):
#...
if not self.Cookies:
print(f"{R}[!]{end} Creating a new session for number {number}", flush=True)
self.new_session()
#...
while True:
try:
req = requests.post(
self.search_url,
headers = self.request_headers,
json = self.request_data,
cookies = self.Cookies,
timeout = self.settings["request_timeout"]
)
if req.status_code!=200:
#...
if n==1:
n=0
print(f"{R}[!]{end} Creating a new session for number {number}", flush=True)
self.new_session()
else:
n+=1
if req.status_code!=401:
raise requests.exceptions.RequestException
#...
except requests.exceptions.RequestException as e:
#...
所以函数 check_number 在进行抓取时,如果它得到 401 状态代码,它将登录以获取新会话。那是因为 API 会话需要在每个特定时间段更新!
好的,当线程开始时问题就来了,这是在下面的函数中(在同一个 class 中):
def check_file(self, input_file):
#...
if self.settings["threaded"]:
#...
with concurrent.futures.ThreadPoolExecutor(max_workers=self.settings["threads"]) as executor:
#...
for num in executor.map(
self.check_number, manager.data_rows,
timeout=self.settings["running_timeout"],
chunksize=self.settings["save_interval"] ):
if not self.Cookies:
self.new_session()
if num:
#...
time.sleep(self.settings["throttle"])
else:
start_time = time.time()
for row in manager.data_rows:
#...
#...
程序会运行顺利地没有任何问题,直到会话结束,所有线程将同时尝试更新会话,这将造成死锁。所以问题是我如何重构代码以便在会话结束时我可以立即为所有线程更新会话而不会导致死锁?另请注意,在此程序中,当您注意到“else”语句时,用户可以决定 运行 使用设置取消线程化。
我搜索并尝试了很多东西,但我对线程不是很好,我希望任何人都可以帮助我,因为这是我工作中的问题,而不是业余项目。
提前致谢
我通过制作一个线程函数解决了这个问题,该函数每 5 秒检查一次会话是否仍然可用,如果不可用,它将更新会话,同时所有 运行ning 线程等待并每 5 秒重试一次。
这背后的代码:
class 外的装饰器到 运行 线程中的函数:
def threaded(fn):
def run(*k, **kw):
threaded_function = threading.Thread(target=fn, args=k, kwargs=kw)
threaded_function.start()
return threaded_function
return run
保持会话可行的线程函数:
@threaded
def session_keeper(self):
# This function is to make all threads use the same session to prevent deadlocks
while self.keep_threading:
time.sleep(5)
if not self.Cookies:
print(f"{G}{Bold}[SESSION]{end} Creating a new session", flush=True)
self.new_session()
continue
else:
dreq = requests.get(self.data_url, cookies = self.Cookies, headers=self.request_headers)
if dreq.status_code!=200:
print(f"{R}{Bold}[!]{end} Session ended, creating new one...", flush=True)
self.new_session()
continue
修改“check_number”函数来执行此操作:
while True:
try:
req = requests.post(
self.search_url,
headers = self.request_headers,
json = self.request_data,
cookies = self.Cookies,
timeout = self.settings["request_timeout"]
)
if req.status_code!=200:
#...
time.sleep(5)
if req.status_code!=401:
raise requests.exceptions.RequestException
else:
continue
我希望这能对某人有所帮助:)