如何使用 concurrent.futures.ThreadPoolExecutor 或 multiprocessing.pool.ThreadPool 将一些变量绑定到线程?
How to bind some variable to thread with concurrent.futures.ThreadPoolExecutor or multiprocessing.pool.ThreadPool?
我想做的是这样的:
class MyThread(threading.Thread):
def __init__(self, host, port):
threading.Thread.__init__(self)
# self._sock = self.initsocket(host, port)
self._id = random.randint(0, 100)
def run(self):
for i in range(3):
print("current id: {}".format(self._id))
def main():
ts = []
for i in range(5):
t = MyThread("localhost", 3001)
t.start()
ts.append(t)
for t in ts:
t.join()
我得到了这些输出:
current id: 10
current id: 10
current id: 13
current id: 43
current id: 13
current id: 10
current id: 83
current id: 83
current id: 83
current id: 13
current id: 98
current id: 43
current id: 98
current id: 43
current id: 98
这个输出就是我想要的 可以看到,我的_id
在不同的线程中是不同的,但是在单线程中,我共享相同的_id
.(_id
只是其中一个变量,我还有很多其他类似的变量).
现在,我想用 multiprocessing.pool.ThreadPool
做同样的事情
class MyProcessor():
def __init__(self, host, port):
# self._sock = self.initsocket(host, port)
self._id = random.randint(0, 100)
def __call__(self, i):
print("current id: {}".format(self._id))
return self._id * i
def main():
with ThreadPool(5) as p:
p.map(MyProcessor("localhost", 3001), range(15))
但是现在_id
将被所有线程共享:
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
对于concurrent.futures.ThreadPoolExecutor
,我也尝试做同样的事情:
class MyProcessor():
def __init__(self, host, port):
# self.initsocket(host, port)
self._id = random.randint(0, 100)
def __call__(self, i):
print("current id: {}".format(self._id))
return self._id * i
def main():
with ThreadPoolExecutor(max_workers=5) as executor:
func = MyProcessor("localhost", 3001)
futures = [executor.submit(func, i) for i in range(15)]
for f in as_completed(futures):
pass
输出是这样的:
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
当然,我得到这个结果并不奇怪,因为我只是调用__init__
一次。但我要问的是:
如何用 concurrent.futures.ThreadPoolExecutor
和 multiprocessing.pool.ThreadPool
做同样的事情(也请不要再使用全局变量)。
这里有几个问题,我会尽力解决所有问题。
在您给出的第一个示例中,您可以完全控制您创建的所有 Thread
,因此每个线程在初始化程序中都有一个唯一的 ID。问题当然是你一次启动所有线程,这对于大量线程来说可能是非常低效的。
在问题的两个线程池示例中,您为可调用对象初始化了一次 ID,因此当然每个线程没有单独的 ID。正确的做法是为每个线程初始化一个 ID,方法是在 __call__
方法中进行:
class MyProcessor():
def __init__(self, host, port):
self.initsocket(host, port)
def __call__(self, i):
id_ = random.randint(0, 100)
print("current id: {}".format(id_))
return id_ * i
def main():
func = MyProcessor("localhost", 3001)
with ThreadPoolExecutor(max_workers=5) as executor:
collections.deque(executor.map(MyProcessor, range(15)), maxlen=0)
请注意,您可以缩短 concurrent.futures.ThreadPoolExecutor
example by using the map
method there as well, if all you care about is the final result and not the intermediate Future
objects. The deque(..., maxlen=0)
调用是使用迭代器的标准用法。
鉴于您在评论中链接到的要点,我理解您为什么想要线程本地数据。但是,您当然不需要全局变量来实现该结果。这里有几个备选方案:
只需将您的 thread-local 数据添加到初始化程序中的 self
,瞧,所有调用都可以访问它而无需全局:
def __init__(self, host, port):
self.thread_local = threading.local()
def __call__(self, i):
try:
id_ = self.thread_local.id_
except AttributeError:
id_ = random.randint(0, 100)
...
使用函数局部数据而不是线程局部数据。您正在使用线程本地数据来避免将您的连接(在要点中)传递给某些私有函数。这不是真正的需要,只是一种审美选择。你总是可以有 def _send_data(self, conn, **kwargs)
和 def _recv_data(self, conn)
,因为无论如何,连接实际来自的唯一地方是 __call__
。
虽然在某些情况下选项 #1 是可行的,但我强烈建议您不要将它与任何类型一起使用线程池管理器。线程池可以重用相同的线程到 运行 个任务,这些任务从它们提交到的队列中按顺序排列。这意味着您将在本应自行打开的任务中得到相同的连接。在您的原始示例中,您可以独立创建所有线程,但是当您在回收池线程上多次调用 MyProcessor
时,它可能不会很好。
我想做的是这样的:
class MyThread(threading.Thread):
def __init__(self, host, port):
threading.Thread.__init__(self)
# self._sock = self.initsocket(host, port)
self._id = random.randint(0, 100)
def run(self):
for i in range(3):
print("current id: {}".format(self._id))
def main():
ts = []
for i in range(5):
t = MyThread("localhost", 3001)
t.start()
ts.append(t)
for t in ts:
t.join()
我得到了这些输出:
current id: 10
current id: 10
current id: 13
current id: 43
current id: 13
current id: 10
current id: 83
current id: 83
current id: 83
current id: 13
current id: 98
current id: 43
current id: 98
current id: 43
current id: 98
这个输出就是我想要的 可以看到,我的_id
在不同的线程中是不同的,但是在单线程中,我共享相同的_id
.(_id
只是其中一个变量,我还有很多其他类似的变量).
现在,我想用 multiprocessing.pool.ThreadPool
class MyProcessor():
def __init__(self, host, port):
# self._sock = self.initsocket(host, port)
self._id = random.randint(0, 100)
def __call__(self, i):
print("current id: {}".format(self._id))
return self._id * i
def main():
with ThreadPool(5) as p:
p.map(MyProcessor("localhost", 3001), range(15))
但是现在_id
将被所有线程共享:
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
对于concurrent.futures.ThreadPoolExecutor
,我也尝试做同样的事情:
class MyProcessor():
def __init__(self, host, port):
# self.initsocket(host, port)
self._id = random.randint(0, 100)
def __call__(self, i):
print("current id: {}".format(self._id))
return self._id * i
def main():
with ThreadPoolExecutor(max_workers=5) as executor:
func = MyProcessor("localhost", 3001)
futures = [executor.submit(func, i) for i in range(15)]
for f in as_completed(futures):
pass
输出是这样的:
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
当然,我得到这个结果并不奇怪,因为我只是调用__init__
一次。但我要问的是:
如何用 concurrent.futures.ThreadPoolExecutor
和 multiprocessing.pool.ThreadPool
做同样的事情(也请不要再使用全局变量)。
这里有几个问题,我会尽力解决所有问题。
在您给出的第一个示例中,您可以完全控制您创建的所有 Thread
,因此每个线程在初始化程序中都有一个唯一的 ID。问题当然是你一次启动所有线程,这对于大量线程来说可能是非常低效的。
在问题的两个线程池示例中,您为可调用对象初始化了一次 ID,因此当然每个线程没有单独的 ID。正确的做法是为每个线程初始化一个 ID,方法是在 __call__
方法中进行:
class MyProcessor(): def __init__(self, host, port): self.initsocket(host, port) def __call__(self, i): id_ = random.randint(0, 100) print("current id: {}".format(id_)) return id_ * i def main(): func = MyProcessor("localhost", 3001) with ThreadPoolExecutor(max_workers=5) as executor: collections.deque(executor.map(MyProcessor, range(15)), maxlen=0)
请注意,您可以缩短 concurrent.futures.ThreadPoolExecutor
example by using the map
method there as well, if all you care about is the final result and not the intermediate Future
objects. The deque(..., maxlen=0)
调用是使用迭代器的标准用法。
鉴于您在评论中链接到的要点,我理解您为什么想要线程本地数据。但是,您当然不需要全局变量来实现该结果。这里有几个备选方案:
只需将您的 thread-local 数据添加到初始化程序中的
self
,瞧,所有调用都可以访问它而无需全局:def __init__(self, host, port): self.thread_local = threading.local() def __call__(self, i): try: id_ = self.thread_local.id_ except AttributeError: id_ = random.randint(0, 100) ...
使用函数局部数据而不是线程局部数据。您正在使用线程本地数据来避免将您的连接(在要点中)传递给某些私有函数。这不是真正的需要,只是一种审美选择。你总是可以有
def _send_data(self, conn, **kwargs)
和def _recv_data(self, conn)
,因为无论如何,连接实际来自的唯一地方是__call__
。
虽然在某些情况下选项 #1 是可行的,但我强烈建议您不要将它与任何类型一起使用线程池管理器。线程池可以重用相同的线程到 运行 个任务,这些任务从它们提交到的队列中按顺序排列。这意味着您将在本应自行打开的任务中得到相同的连接。在您的原始示例中,您可以独立创建所有线程,但是当您在回收池线程上多次调用 MyProcessor
时,它可能不会很好。