gevent/threading 导致一些死锁
gevent/threading causes some deadlock
我有这段代码,其目的是对请求进行重复数据删除。
def dedup_requests(f):
pending = {}
@functools.wraps(f)
def wrapped(*args, **kwargs):
key = _make_call_key(args, kwargs)
if key not in pending:
pending[key] = gevent.spawn(f, *args, **kwargs)
result = pending[key].get()
if key in pending:
del pending[key]
return result
return wrapped
我怀疑它以某种方式导致了死锁(这种情况偶尔发生,我无法重现)。
使用线程和 gevent 时都会发生。
允许重复使用get
吗?
当不涉及线程时,这段代码甚至会产生死锁吗?
请注意,它在其他 gevent 任务下运行,因此生成的任务可能会生成其他任务,以防出现问题。
你的问题是你的代码不是异步的。您需要让函数本身处理密钥更新,然后在 while 循环中测试您的值。这是异步工作的一个例子。您可以通过注意到最后一个元素有时出现在列表的最前面来证明这一点。
import gevent
import random
pending = {}
def dedup_requests(key, *args, **kwargs):
global pending
if key not in pending:
gevent.spawn(ftest, key, *args, **kwargs)
def ftest(key, *args, **kwargs):
global pending
z = random.randint(1,7)
gevent.sleep(z)
pending[key] = z
return z
l = ['test','test2','test3']
for i in l:
dedup_requests(i)
while 1:
if set(pending.keys()) != set(l):
print(pending)
else:
print(pending)
break
gevent.sleep(1)
虽然我仍然不完全理解死锁的根源(我最好的猜测是 get
在多次调用时并没有真正按预期工作),这似乎有效:
from gevent import lock
def queue_identical_calls(f, max_size=100):
pending = {}
@functools.wraps(f)
def wrapped(*args, **kwargs):
key = _make_call_key(args, kwargs)
if key not in pending:
pending[key] = lock.BoundedSemaphore(1)
lock_for_current_call = pending[key]
lock_for_current_call.acquire()
result = f(*args, **kwargs)
lock_for_current_call.release()
if len(pending) > max_size:
pending.clear()
return result
return wrapped
我有这段代码,其目的是对请求进行重复数据删除。
def dedup_requests(f):
pending = {}
@functools.wraps(f)
def wrapped(*args, **kwargs):
key = _make_call_key(args, kwargs)
if key not in pending:
pending[key] = gevent.spawn(f, *args, **kwargs)
result = pending[key].get()
if key in pending:
del pending[key]
return result
return wrapped
我怀疑它以某种方式导致了死锁(这种情况偶尔发生,我无法重现)。
使用线程和 gevent 时都会发生。
允许重复使用get
吗?
当不涉及线程时,这段代码甚至会产生死锁吗?
请注意,它在其他 gevent 任务下运行,因此生成的任务可能会生成其他任务,以防出现问题。
你的问题是你的代码不是异步的。您需要让函数本身处理密钥更新,然后在 while 循环中测试您的值。这是异步工作的一个例子。您可以通过注意到最后一个元素有时出现在列表的最前面来证明这一点。
import gevent
import random
pending = {}
def dedup_requests(key, *args, **kwargs):
global pending
if key not in pending:
gevent.spawn(ftest, key, *args, **kwargs)
def ftest(key, *args, **kwargs):
global pending
z = random.randint(1,7)
gevent.sleep(z)
pending[key] = z
return z
l = ['test','test2','test3']
for i in l:
dedup_requests(i)
while 1:
if set(pending.keys()) != set(l):
print(pending)
else:
print(pending)
break
gevent.sleep(1)
虽然我仍然不完全理解死锁的根源(我最好的猜测是 get
在多次调用时并没有真正按预期工作),这似乎有效:
from gevent import lock
def queue_identical_calls(f, max_size=100):
pending = {}
@functools.wraps(f)
def wrapped(*args, **kwargs):
key = _make_call_key(args, kwargs)
if key not in pending:
pending[key] = lock.BoundedSemaphore(1)
lock_for_current_call = pending[key]
lock_for_current_call.acquire()
result = f(*args, **kwargs)
lock_for_current_call.release()
if len(pending) > max_size:
pending.clear()
return result
return wrapped