Python - 如何使以下函数线程化?
Python - How do I make the below function threaded?
我对 python 和一般编程还很陌生 - 正在寻找有关如何加强以下功能并节省一些时间的建议。一些背景信息:
要求是我在给定的顶级文件夹中收集每个 sub-file/folder 的名称和 ID。问题是,我从中请求数据的服务器只会 return 单个文件夹的内容,并且响应将始终指定 returned 对象是文件还是文件夹。
(伪代码示例,只是为了快速演示):
Top_level_folderid = 1111
url = "fileserverapi.couldbebetter.com/thismighttakeawhile"
post(url, data=Top_level_folderid)
response({"jim's folder" : id=1234, filetype=folder}, {"weird_video.mp4" : id=4321, filetype=file})
然后我必须遍历每个响应并 post 返回服务器以获取下一组,在某些情况下,整个顶级文件夹可能包含多达 15,000 个文件夹和 30,000 多个随机分布的文件一些文件夹包含 1 个文件和 15 个文件夹,另一些文件夹包含 7 个文件和 2 个子文件夹等。
API 本身响应很快,但我不知道它可以处理多少个并发连接,所以我需要能够调整并在函数中找到最佳点,我猜它会处理 10-50 的任何地方。我现在的功能:
def drill_folder_loop(folder_list, project_id, directory, jsession_id):
count = 0
temp_folder_list = folder_list #\ A list of dicts [{folder_name1 : folder_id1}, {folder_name2 : folder_id2}]
while count == 0:
temp_folder_list1 = []
for folder in temp_folder_list:
f_name = folder.keys()[0] #// folder_name (not actually used here)
f_id = folder.values()[0] #// folder id
folder_dict = list_folders_files(f_id, jsession_id) #// list_folders_files posts to the api and builds the response back into a list of dicts, same as the original passed to this function.
folder_list = process_folder_files(folder_dict, directory, jsession_id) #// somewhat irrelevant to the question - I have to commit the file data to a DB, I could strip the folder list in this function but since i need it there I just return it instead.
process_recipients = recipient_process(folder_dict, "no", f_id,
directory, project_id)#// more irrelevant but necessary processing.
for i in range(0, len(folder_list)):
append_f = folder_list[i]
temp_folder_list1.append(append_f)#// append new folders to list outside loop
temp_folder_list = [] #// empty temp_folder_list, loop may contain more than a single folder so I have to empty it once I've processed all the folders
for i in range(0, len(temp_folder_list1)):#// Put the new folder list into temp_folder_list so the loop restarts
append_f2 = temp_folder_list1[i]
temp_folder_list.append(append_f2)
if not temp_folder_list: #// end the loop if there are no more sub-folders
count += 1
return log_info("Folder loop complete")
重新阅读这是关于变量命名的一个很好的教训......并不是最简洁的......代码本身工作正常但你现在可能已经想象到,它需要很长时间才能完成数以千计的文件夹...关于如何将其变成 multi-threaded/processing 野兽的任何 advice/direction?感谢您抽时间阅读!
编辑:
为清楚起见,我不想在循环中处理文件夹,而是在线程中分配任务以拥有多个文件夹,因此 post 请求和响应同时发生,这样整个过程花费的时间更少。现在它一次只循环一个文件夹..希望这能澄清..
编辑:
根据 Noctis Skytower 的建议,我做了一些小改动以支持 python 2.7(Queue 与 queue 和 .clock() 而不是 perf_counter())。太近了!我 运行 遇到的问题是,当我将 运行 线程更改为 1 时,它完美地完成了 - 当我出于某种原因(随机)将其增加到 25 时,变量 f_id 在 dfl_worker() 中是 None。鉴于它适用于 1 个线程,我猜这不是建议的问题,而是我的代码中的其他问题,所以我将其标记为已接受。谢谢!
class ThreadPool:
def __init__(self, count, timeout, daemonic):
self.__busy = 0
self.__idle = clock()
self.__jobs = Queue()
self.__lock = Lock()
self.__pool = []
self.__timeout = timeout
for _ in range(count):
thread = Thread(target=self.__worker)
thread.daemon = daemonic
thread.start()
self.__pool.append(thread)
def __worker(self):
while True:
try:
function, args, kwargs = self.__jobs.get(True, 0.1)
except Empty:
with self.__lock:
if self.__busy:
continue
if clock() - self.__idle < self.__timeout:
continue
break
else:
with self.__lock:
self.__busy += 1
try:
function(*args, **kwargs)
except:
pass
with self.__lock:
self.__busy -= 1
self.__idle = clock()
def apply(self, function, *args, **kwargs):
self.__pool = list(filter(Thread.is_alive, self.__pool))
if not self.__pool:
raise RuntimeError('ThreadPool has no running Threads')
self.__jobs.put((function, args, kwargs))
def join(self):
for thread in self.__pool:
thread.join()
def drill_folder_loop(folder_list, project_id, directory, jsession_id):
tp = ThreadPool(25, 1, False)
tp.apply(dfl_worker, tp, folder_list, project_id, directory, jsession_id)
tp.join()
def dfl_worker(tp, folder_list, project_id, directory, jsession_id):
for folder in folder_list:
f_name = folder.keys()[0]
f_id = folder.values()[0]
f_dict = list_folders_files(f_id, jsession_id)
f_list = process_folder_files(f_dict, directory, jsession_id)
tp.apply(dfl_worker, tp, f_list, project_id, directory, jsession_id)
recipient_process(f_dict, 'no', f_id, directory, project_id)
log_info('One folder processed')
理解你想用这段代码做什么有点棘手。
据我所知,您想使这段代码成为多线程;要实现这一点,你需要找到一个可以相互独立执行的 routine/taskset
然后你可以将它从循环中取出并为其创建一个独立的函数。
def task(someparams):
#mytasks using the required someparams
现在您可以创建一组工作线程并将工作分配给 运行 task
例程并完成您的工作。
此处如何对 task
例程进行多线程处理:
import thread
WORKER_THREAD = 10
c = 0
while c < WORKER_THREAD:
thread.start_new_thread( task, (someparams, ) )
while 1:
pass
thread.start_new_thread( 任务, (someparams, ) )
我可以推荐以下吗?
from queue import Empty, Queue
from threading import Lock, Thread
from time import perf_counter
def drill_folder_loop(folder_list, project_id, directory, jsession_id):
while True:
next_folder_list = []
for folder in folder_list:
f_name, f_id = folder.popitem()
f_dict = list_folders_files(f_id, jsession_id)
f_list = process_folder_files(f_dict, directory, jsession_id)
recipient_process(f_dict, 'no', f_id, directory, project_id)
next_folder_list.extend(f_list)
if not next_folder_list:
break
folder_list = next_folder_list
return log_info('Folder loop complete')
###############################################################################
class ThreadPool:
def __init__(self, count, timeout, daemonic):
self.__busy = 0
self.__idle = perf_counter()
self.__jobs = Queue()
self.__lock = Lock()
self.__pool = []
self.__timeout = timeout
for _ in range(count):
thread = Thread(target=self.__worker)
thread.daemon = daemonic
thread.start()
self.__pool.append(thread)
def __worker(self):
while True:
try:
function, args, kwargs = self.__jobs.get(True, 0.1)
except Empty:
with self.__lock:
if self.__busy:
continue
if perf_counter() - self.__idle < self.__timeout:
continue
break
else:
with self.__lock:
self.__busy += 1
try:
function(*args, **kwargs)
except:
pass
with self.__lock:
self.__busy -= 1
self.__idle = perf_counter()
def apply(self, function, *args, **kwargs):
self.__pool = list(filter(Thread.is_alive, self.__pool))
if not self.__pool:
raise RuntimeError('ThreadPool has no running Threads')
self.__jobs.put((function, args, kwargs))
def join(self):
for thread in self.__pool:
thread.join()
def drill_folder_loop(folder_list, project_id, directory, jsession_id):
tp = ThreadPool(25, 1, False)
tp.apply(dfl_worker, tp, folder_list, project_id, directory, jsession_id)
tp.join()
def dfl_worker(tp, folder_list, project_id, directory, jsession_id):
for folder in folder_list:
f_name, f_id = folder.popitem()
f_dict = list_folders_files(f_id, jsession_id)
f_list = process_folder_files(f_dict, directory, jsession_id)
tp.apply(dfl_worker, tp, f_list, project_id, directory, jsession_id)
recipient_process(f_dict, 'no', f_id, directory, project_id)
log_info('One folder processed')
第一个 drill_folder_loop
是对你的函数的重写,应该做同样的事情,但第二个版本应该利用 ThreadPool
class 以便你的文件夹列表可以被处理最多 25 个线程并发。请注意,线程版本不会返回任何重要内容,如果 tp.join()
最后被删除,它几乎会在执行后立即 returns。
我对 python 和一般编程还很陌生 - 正在寻找有关如何加强以下功能并节省一些时间的建议。一些背景信息:
要求是我在给定的顶级文件夹中收集每个 sub-file/folder 的名称和 ID。问题是,我从中请求数据的服务器只会 return 单个文件夹的内容,并且响应将始终指定 returned 对象是文件还是文件夹。
(伪代码示例,只是为了快速演示):
Top_level_folderid = 1111
url = "fileserverapi.couldbebetter.com/thismighttakeawhile"
post(url, data=Top_level_folderid)
response({"jim's folder" : id=1234, filetype=folder}, {"weird_video.mp4" : id=4321, filetype=file})
然后我必须遍历每个响应并 post 返回服务器以获取下一组,在某些情况下,整个顶级文件夹可能包含多达 15,000 个文件夹和 30,000 多个随机分布的文件一些文件夹包含 1 个文件和 15 个文件夹,另一些文件夹包含 7 个文件和 2 个子文件夹等。
API 本身响应很快,但我不知道它可以处理多少个并发连接,所以我需要能够调整并在函数中找到最佳点,我猜它会处理 10-50 的任何地方。我现在的功能:
def drill_folder_loop(folder_list, project_id, directory, jsession_id):
count = 0
temp_folder_list = folder_list #\ A list of dicts [{folder_name1 : folder_id1}, {folder_name2 : folder_id2}]
while count == 0:
temp_folder_list1 = []
for folder in temp_folder_list:
f_name = folder.keys()[0] #// folder_name (not actually used here)
f_id = folder.values()[0] #// folder id
folder_dict = list_folders_files(f_id, jsession_id) #// list_folders_files posts to the api and builds the response back into a list of dicts, same as the original passed to this function.
folder_list = process_folder_files(folder_dict, directory, jsession_id) #// somewhat irrelevant to the question - I have to commit the file data to a DB, I could strip the folder list in this function but since i need it there I just return it instead.
process_recipients = recipient_process(folder_dict, "no", f_id,
directory, project_id)#// more irrelevant but necessary processing.
for i in range(0, len(folder_list)):
append_f = folder_list[i]
temp_folder_list1.append(append_f)#// append new folders to list outside loop
temp_folder_list = [] #// empty temp_folder_list, loop may contain more than a single folder so I have to empty it once I've processed all the folders
for i in range(0, len(temp_folder_list1)):#// Put the new folder list into temp_folder_list so the loop restarts
append_f2 = temp_folder_list1[i]
temp_folder_list.append(append_f2)
if not temp_folder_list: #// end the loop if there are no more sub-folders
count += 1
return log_info("Folder loop complete")
重新阅读这是关于变量命名的一个很好的教训......并不是最简洁的......代码本身工作正常但你现在可能已经想象到,它需要很长时间才能完成数以千计的文件夹...关于如何将其变成 multi-threaded/processing 野兽的任何 advice/direction?感谢您抽时间阅读!
编辑:
为清楚起见,我不想在循环中处理文件夹,而是在线程中分配任务以拥有多个文件夹,因此 post 请求和响应同时发生,这样整个过程花费的时间更少。现在它一次只循环一个文件夹..希望这能澄清..
编辑: 根据 Noctis Skytower 的建议,我做了一些小改动以支持 python 2.7(Queue 与 queue 和 .clock() 而不是 perf_counter())。太近了!我 运行 遇到的问题是,当我将 运行 线程更改为 1 时,它完美地完成了 - 当我出于某种原因(随机)将其增加到 25 时,变量 f_id 在 dfl_worker() 中是 None。鉴于它适用于 1 个线程,我猜这不是建议的问题,而是我的代码中的其他问题,所以我将其标记为已接受。谢谢!
class ThreadPool:
def __init__(self, count, timeout, daemonic):
self.__busy = 0
self.__idle = clock()
self.__jobs = Queue()
self.__lock = Lock()
self.__pool = []
self.__timeout = timeout
for _ in range(count):
thread = Thread(target=self.__worker)
thread.daemon = daemonic
thread.start()
self.__pool.append(thread)
def __worker(self):
while True:
try:
function, args, kwargs = self.__jobs.get(True, 0.1)
except Empty:
with self.__lock:
if self.__busy:
continue
if clock() - self.__idle < self.__timeout:
continue
break
else:
with self.__lock:
self.__busy += 1
try:
function(*args, **kwargs)
except:
pass
with self.__lock:
self.__busy -= 1
self.__idle = clock()
def apply(self, function, *args, **kwargs):
self.__pool = list(filter(Thread.is_alive, self.__pool))
if not self.__pool:
raise RuntimeError('ThreadPool has no running Threads')
self.__jobs.put((function, args, kwargs))
def join(self):
for thread in self.__pool:
thread.join()
def drill_folder_loop(folder_list, project_id, directory, jsession_id):
tp = ThreadPool(25, 1, False)
tp.apply(dfl_worker, tp, folder_list, project_id, directory, jsession_id)
tp.join()
def dfl_worker(tp, folder_list, project_id, directory, jsession_id):
for folder in folder_list:
f_name = folder.keys()[0]
f_id = folder.values()[0]
f_dict = list_folders_files(f_id, jsession_id)
f_list = process_folder_files(f_dict, directory, jsession_id)
tp.apply(dfl_worker, tp, f_list, project_id, directory, jsession_id)
recipient_process(f_dict, 'no', f_id, directory, project_id)
log_info('One folder processed')
理解你想用这段代码做什么有点棘手。
据我所知,您想使这段代码成为多线程;要实现这一点,你需要找到一个可以相互独立执行的 routine/taskset
然后你可以将它从循环中取出并为其创建一个独立的函数。
def task(someparams):
#mytasks using the required someparams
现在您可以创建一组工作线程并将工作分配给 运行 task
例程并完成您的工作。
此处如何对 task
例程进行多线程处理:
import thread
WORKER_THREAD = 10
c = 0
while c < WORKER_THREAD:
thread.start_new_thread( task, (someparams, ) )
while 1:
pass
thread.start_new_thread( 任务, (someparams, ) )
我可以推荐以下吗?
from queue import Empty, Queue
from threading import Lock, Thread
from time import perf_counter
def drill_folder_loop(folder_list, project_id, directory, jsession_id):
while True:
next_folder_list = []
for folder in folder_list:
f_name, f_id = folder.popitem()
f_dict = list_folders_files(f_id, jsession_id)
f_list = process_folder_files(f_dict, directory, jsession_id)
recipient_process(f_dict, 'no', f_id, directory, project_id)
next_folder_list.extend(f_list)
if not next_folder_list:
break
folder_list = next_folder_list
return log_info('Folder loop complete')
###############################################################################
class ThreadPool:
def __init__(self, count, timeout, daemonic):
self.__busy = 0
self.__idle = perf_counter()
self.__jobs = Queue()
self.__lock = Lock()
self.__pool = []
self.__timeout = timeout
for _ in range(count):
thread = Thread(target=self.__worker)
thread.daemon = daemonic
thread.start()
self.__pool.append(thread)
def __worker(self):
while True:
try:
function, args, kwargs = self.__jobs.get(True, 0.1)
except Empty:
with self.__lock:
if self.__busy:
continue
if perf_counter() - self.__idle < self.__timeout:
continue
break
else:
with self.__lock:
self.__busy += 1
try:
function(*args, **kwargs)
except:
pass
with self.__lock:
self.__busy -= 1
self.__idle = perf_counter()
def apply(self, function, *args, **kwargs):
self.__pool = list(filter(Thread.is_alive, self.__pool))
if not self.__pool:
raise RuntimeError('ThreadPool has no running Threads')
self.__jobs.put((function, args, kwargs))
def join(self):
for thread in self.__pool:
thread.join()
def drill_folder_loop(folder_list, project_id, directory, jsession_id):
tp = ThreadPool(25, 1, False)
tp.apply(dfl_worker, tp, folder_list, project_id, directory, jsession_id)
tp.join()
def dfl_worker(tp, folder_list, project_id, directory, jsession_id):
for folder in folder_list:
f_name, f_id = folder.popitem()
f_dict = list_folders_files(f_id, jsession_id)
f_list = process_folder_files(f_dict, directory, jsession_id)
tp.apply(dfl_worker, tp, f_list, project_id, directory, jsession_id)
recipient_process(f_dict, 'no', f_id, directory, project_id)
log_info('One folder processed')
第一个 drill_folder_loop
是对你的函数的重写,应该做同样的事情,但第二个版本应该利用 ThreadPool
class 以便你的文件夹列表可以被处理最多 25 个线程并发。请注意,线程版本不会返回任何重要内容,如果 tp.join()
最后被删除,它几乎会在执行后立即 returns。