python 设置未来状态
python setting state of a future
设置未来的状态来传递参数是不好的做法吗?
具体使用 future.q = q
之类的东西在回调
中使用 q
from threading import Thread
from threading import RLock
from threading import current_thread
from concurrent.futures import Future
import time
import random
class NonBlockingQueue:
def __init__(self, max_size):
self.max_size = max_size
self.q = []
self.q_waiting_puts = []
self.q_waiting_gets = []
self.lock = RLock()
def enqueue(self, item):
future = None
with self.lock:
curr_size = len(self.q)
# queue is full so create a future for a put
# request
if curr_size == self.max_size:
future = Future()
self.q_waiting_puts.append(future)
else:
self.q.append(item)
# remember to resolve a pending future for
# a get request
if len(self.q_waiting_gets) != 0:
future_get = self.q_waiting_gets.pop(0)
future_get.set_result(self.q.pop(0))
return future
def retry_enqueue(future):
print("\nCallback invoked by thread {0}".format(current_thread().getName()))
item = future.item
q = future.q
new_future = q.enqueue(item)
if new_future is not None:
new_future.item = item
new_future.q = q
new_future.add_done_callback(retry_enqueue)
else:
print("\n{0} successfully added on a retry".format(item))
### MAIN CODE
def producer_thread(q):
item = 1
while 1:
future = q.enqueue(item)
if future is not None:
future.item = item
future.q = q
future.add_done_callback(retry_enqueue)
item += 1
# slow down the producer
time.sleep(random.randint(1, 3))
像这样传递参数不是一个好主意。
原因是将来(没有双关语),他们可能会禁止在 Future
对象上设置自定义属性,这会破坏您的代码。
更好的解决方案是使用 functools.partial
或 lambda
将额外参数传递给回调。
首先,接受 q
作为 retry_enqueue
函数中的参数:
def retry_enqueue(future, q): # accept 'q' argument
...
使用 functools.partial
的示例:
import functools
future.add_done_callback(functools.partial(retry_enqueue, q=q))
使用 lambda
的示例:
future.add_done_callback(lambda future: retry_enqueue(future, q))
设置未来的状态来传递参数是不好的做法吗?
具体使用 future.q = q
之类的东西在回调
q
from threading import Thread
from threading import RLock
from threading import current_thread
from concurrent.futures import Future
import time
import random
class NonBlockingQueue:
def __init__(self, max_size):
self.max_size = max_size
self.q = []
self.q_waiting_puts = []
self.q_waiting_gets = []
self.lock = RLock()
def enqueue(self, item):
future = None
with self.lock:
curr_size = len(self.q)
# queue is full so create a future for a put
# request
if curr_size == self.max_size:
future = Future()
self.q_waiting_puts.append(future)
else:
self.q.append(item)
# remember to resolve a pending future for
# a get request
if len(self.q_waiting_gets) != 0:
future_get = self.q_waiting_gets.pop(0)
future_get.set_result(self.q.pop(0))
return future
def retry_enqueue(future):
print("\nCallback invoked by thread {0}".format(current_thread().getName()))
item = future.item
q = future.q
new_future = q.enqueue(item)
if new_future is not None:
new_future.item = item
new_future.q = q
new_future.add_done_callback(retry_enqueue)
else:
print("\n{0} successfully added on a retry".format(item))
### MAIN CODE
def producer_thread(q):
item = 1
while 1:
future = q.enqueue(item)
if future is not None:
future.item = item
future.q = q
future.add_done_callback(retry_enqueue)
item += 1
# slow down the producer
time.sleep(random.randint(1, 3))
像这样传递参数不是一个好主意。
原因是将来(没有双关语),他们可能会禁止在 Future
对象上设置自定义属性,这会破坏您的代码。
更好的解决方案是使用 functools.partial
或 lambda
将额外参数传递给回调。
首先,接受 q
作为 retry_enqueue
函数中的参数:
def retry_enqueue(future, q): # accept 'q' argument
...
使用 functools.partial
的示例:
import functools
future.add_done_callback(functools.partial(retry_enqueue, q=q))
使用 lambda
的示例:
future.add_done_callback(lambda future: retry_enqueue(future, q))