创建一个线程安全的队列平衡器
Creating a thread-safe queue balancer
我的项目涉及为客户批量处理图像。客户端发送压缩的图像文件,这会为每个图像触发 ImageMagick 命令行脚本。我试图解决的问题是,如果这些命令按照我收到它们的顺序排队,那么需要处理 10k 图像的客户端将占用所有资源数小时。我的解决方案是循环每个客户端的队列,这样每个人都可以平等地减慢彼此的速度。我创建了这个 class 来实现这个:
class QueueBalancer():
def __init__(self, cycle_list=[]):
self.cycle_list = cycle_list
self.update_status()
def cmd_gen(self):
index = -1
while True:
try:
if self.cycle_list:
self.processing = True
index += 1
commands = self.cycle_list[index]["commands"]
if commands:
command = commands.pop(0)
if len(commands) == 0:
del self.cycle_list[index]
index -= 1
self.update_status()
yield command
else:
yield None
except IndexError:
index = -1
def get_user(self, user):
return next((item for item in self.cycle_list[0] if item["user"] == user), None)
def create_or_append(self, user, commands):
existing_user = self.get_user(user)
if existing_user:
index = self.cycle_list.index(existing_user)
self.cycle_list[index]["commands"] += commands
else:
self.cycle_list += [{
"user" : user,
"commands" : commands
}]
def update_status(self):
if next((item for item in self.cycle_list if item["commands"] != []), None):
self.processing = True
else:
self.processing = False
def status(self):
return self.processing
从[=12=的else子句可以看出,cycle_list
是一个这样的字典列表:
{"user": "test1", "commands": ["command1", "command2"]},
{"user": "test2", "commands": ["x", "y", "z"]},
{"user": "test3", "commands": ["a", "b", "c"]}
(删除了真实命令,使用了示例字符串)
cmd_gen()
的单个实例将用于将命令馈送到我的 shell,我将使用 create_or_append()
即时添加用户和命令,同时队列中的命令仍在处理中。到目前为止,这在我的初始测试中似乎工作得很好,但这在理论上是线程安全的吗?如果不是,我需要做什么来确保它是?
我对以下部分的线程安全有疑问:
def create_or_append(self, user, commands):
existing_user = self.get_user(user)
if existing_user:
index = self.cycle_list.index(existing_user)
self.cycle_list[index]["commands"] += commands
else:
self.cycle_list += [{
"user" : user,
"commands" : commands
}]
如果 2 个线程 运行 方法 create_or_append
,这 2 个线程有可能处于 else 闭包中,然后稍微损坏您的数据。也许在这个函数上定义一个锁是个好主意。
from threading import Lock
class QueueBalancer():
def __init__(self, cycle_list=None):
self.cycle_list = [] if cycle_list is None else cycle_list
self.lock = Lock()
# .../...
def create_or_append(self, user, commands):
with self.lock:
# ...
编辑:如@matino 所述,update_status
函数也可能存在一些问题,因为它修改了 processing
实例属性。我会建议在它上面使用另一个锁以确保它是线程安全的。
def update_status(self):
with self.update_lock:
if next((item for item in self.cycle_list if item["commands"] != []), None):
self.processing = True
else:
self.processing = False
你的 class 绝对不是线程安全的,因为你改变了它的实例属性:
在 update_status
你改变 self.processing
值
在create_or_append
你修改self.cycle_list
如果不锁定这些属性,您的 class 将不是线程安全的。
旁注:始终在 __init__
方法中初始化所有实例属性。由于您在代码中使用 self.processing
,因此它应该在 __init__
我想我可以像您描述的那样创建一个通用的平衡队列 - 结果如下。我认为仍然存在一些病态的情况,即一个用户可以按顺序处理许多作业,但这会涉及到其他用户的作业被添加到一个特定的 times/orders,所以我认为这不会发生在实际工作中并且不可能除非多个用户串通才能被利用。
from threading import Lock
class UserBalancedJobQueue(object):
def __init__(self):
self._user_jobs = {}
self._user_list = []
self._user_index = 0
self._lock = Lock()
def pop_user_job(self):
with self._lock:
if not self._user_jobs:
raise ValueError("No jobs to run")
if self._user_index >= len(self._user_list):
self._user_index = 0
user = self._user_list[self._user_index]
jobs = self._user_jobs[user]
job = jobs.pop(0)
if not jobs:
self._delete_current_user()
self._user_index += 1
return user, job
def _delete_current_user(self):
user = self._user_list.pop(self._user_index)
del self._user_jobs[user]
def add_user_job(self, user, job):
with self._lock:
if user not in self._user_jobs:
self._user_list.append(user)
self._user_jobs[user] = []
self._user_jobs[user].append(job)
if __name__ == "__main__":
q = UserBalancedJobQueue()
q.add_user_job("tom", "job1")
q.add_user_job("tom", "job2")
q.add_user_job("tom", "job3")
q.add_user_job("fred", "job4")
q.add_user_job("fred", "job5")
for i in xrange(3):
print q.pop_user_job()
print "Adding more jobs"
q.add_user_job("dave", "job6")
q.add_user_job("dave", "job7")
q.add_user_job("dave", "job8")
q.add_user_job("dave", "job9")
try:
while True:
print q.pop_user_job()
except ValueError:
pass
仔细考虑一下,另一种实现方式是为每个用户记住他们上一份工作的时间 运行,然后根据谁的上份工作最旧来选择下一个用户。它可能会更多 'correct' 但它会有(可能微不足道的)额外内存开销来记住每个用户的上次工作时间。
编辑:所以这是一个缓慢的一天 - 这是另一种方法。我想我更喜欢它而不是上面的,尽管由于 O(N) 搜索具有最旧的以前工作的用户,它更慢。
from collections import defaultdict
from threading import Lock
import time
class UserBalancedJobQueue(object):
def __init__(self):
self._user_jobs = defaultdict(list)
self._user_last_run = defaultdict(lambda: 0.0)
self._lock = Lock()
def pop_user_job(self):
with self._lock:
if not self._user_jobs:
raise ValueError("No jobs to run")
user = min(
self._user_jobs.keys(),
key=lambda u: self._user_last_run[u]
)
self._user_last_run[user] = time.time()
jobs = self._user_jobs[user]
job = jobs.pop(0)
if not jobs:
del self._user_jobs[user]
return user, job
def add_user_job(self, user, job):
with self._lock:
self._user_jobs[user].append(job)
我的项目涉及为客户批量处理图像。客户端发送压缩的图像文件,这会为每个图像触发 ImageMagick 命令行脚本。我试图解决的问题是,如果这些命令按照我收到它们的顺序排队,那么需要处理 10k 图像的客户端将占用所有资源数小时。我的解决方案是循环每个客户端的队列,这样每个人都可以平等地减慢彼此的速度。我创建了这个 class 来实现这个:
class QueueBalancer():
def __init__(self, cycle_list=[]):
self.cycle_list = cycle_list
self.update_status()
def cmd_gen(self):
index = -1
while True:
try:
if self.cycle_list:
self.processing = True
index += 1
commands = self.cycle_list[index]["commands"]
if commands:
command = commands.pop(0)
if len(commands) == 0:
del self.cycle_list[index]
index -= 1
self.update_status()
yield command
else:
yield None
except IndexError:
index = -1
def get_user(self, user):
return next((item for item in self.cycle_list[0] if item["user"] == user), None)
def create_or_append(self, user, commands):
existing_user = self.get_user(user)
if existing_user:
index = self.cycle_list.index(existing_user)
self.cycle_list[index]["commands"] += commands
else:
self.cycle_list += [{
"user" : user,
"commands" : commands
}]
def update_status(self):
if next((item for item in self.cycle_list if item["commands"] != []), None):
self.processing = True
else:
self.processing = False
def status(self):
return self.processing
从[=12=的else子句可以看出,cycle_list
是一个这样的字典列表:
{"user": "test1", "commands": ["command1", "command2"]},
{"user": "test2", "commands": ["x", "y", "z"]},
{"user": "test3", "commands": ["a", "b", "c"]}
(删除了真实命令,使用了示例字符串)
cmd_gen()
的单个实例将用于将命令馈送到我的 shell,我将使用 create_or_append()
即时添加用户和命令,同时队列中的命令仍在处理中。到目前为止,这在我的初始测试中似乎工作得很好,但这在理论上是线程安全的吗?如果不是,我需要做什么来确保它是?
我对以下部分的线程安全有疑问:
def create_or_append(self, user, commands):
existing_user = self.get_user(user)
if existing_user:
index = self.cycle_list.index(existing_user)
self.cycle_list[index]["commands"] += commands
else:
self.cycle_list += [{
"user" : user,
"commands" : commands
}]
如果 2 个线程 运行 方法 create_or_append
,这 2 个线程有可能处于 else 闭包中,然后稍微损坏您的数据。也许在这个函数上定义一个锁是个好主意。
from threading import Lock
class QueueBalancer():
def __init__(self, cycle_list=None):
self.cycle_list = [] if cycle_list is None else cycle_list
self.lock = Lock()
# .../...
def create_or_append(self, user, commands):
with self.lock:
# ...
编辑:如@matino 所述,update_status
函数也可能存在一些问题,因为它修改了 processing
实例属性。我会建议在它上面使用另一个锁以确保它是线程安全的。
def update_status(self):
with self.update_lock:
if next((item for item in self.cycle_list if item["commands"] != []), None):
self.processing = True
else:
self.processing = False
你的 class 绝对不是线程安全的,因为你改变了它的实例属性:
在
update_status
你改变self.processing
值在
create_or_append
你修改self.cycle_list
如果不锁定这些属性,您的 class 将不是线程安全的。
旁注:始终在 __init__
方法中初始化所有实例属性。由于您在代码中使用 self.processing
,因此它应该在 __init__
我想我可以像您描述的那样创建一个通用的平衡队列 - 结果如下。我认为仍然存在一些病态的情况,即一个用户可以按顺序处理许多作业,但这会涉及到其他用户的作业被添加到一个特定的 times/orders,所以我认为这不会发生在实际工作中并且不可能除非多个用户串通才能被利用。
from threading import Lock
class UserBalancedJobQueue(object):
def __init__(self):
self._user_jobs = {}
self._user_list = []
self._user_index = 0
self._lock = Lock()
def pop_user_job(self):
with self._lock:
if not self._user_jobs:
raise ValueError("No jobs to run")
if self._user_index >= len(self._user_list):
self._user_index = 0
user = self._user_list[self._user_index]
jobs = self._user_jobs[user]
job = jobs.pop(0)
if not jobs:
self._delete_current_user()
self._user_index += 1
return user, job
def _delete_current_user(self):
user = self._user_list.pop(self._user_index)
del self._user_jobs[user]
def add_user_job(self, user, job):
with self._lock:
if user not in self._user_jobs:
self._user_list.append(user)
self._user_jobs[user] = []
self._user_jobs[user].append(job)
if __name__ == "__main__":
q = UserBalancedJobQueue()
q.add_user_job("tom", "job1")
q.add_user_job("tom", "job2")
q.add_user_job("tom", "job3")
q.add_user_job("fred", "job4")
q.add_user_job("fred", "job5")
for i in xrange(3):
print q.pop_user_job()
print "Adding more jobs"
q.add_user_job("dave", "job6")
q.add_user_job("dave", "job7")
q.add_user_job("dave", "job8")
q.add_user_job("dave", "job9")
try:
while True:
print q.pop_user_job()
except ValueError:
pass
仔细考虑一下,另一种实现方式是为每个用户记住他们上一份工作的时间 运行,然后根据谁的上份工作最旧来选择下一个用户。它可能会更多 'correct' 但它会有(可能微不足道的)额外内存开销来记住每个用户的上次工作时间。
编辑:所以这是一个缓慢的一天 - 这是另一种方法。我想我更喜欢它而不是上面的,尽管由于 O(N) 搜索具有最旧的以前工作的用户,它更慢。
from collections import defaultdict
from threading import Lock
import time
class UserBalancedJobQueue(object):
def __init__(self):
self._user_jobs = defaultdict(list)
self._user_last_run = defaultdict(lambda: 0.0)
self._lock = Lock()
def pop_user_job(self):
with self._lock:
if not self._user_jobs:
raise ValueError("No jobs to run")
user = min(
self._user_jobs.keys(),
key=lambda u: self._user_last_run[u]
)
self._user_last_run[user] = time.time()
jobs = self._user_jobs[user]
job = jobs.pop(0)
if not jobs:
del self._user_jobs[user]
return user, job
def add_user_job(self, user, job):
with self._lock:
self._user_jobs[user].append(job)