条件变量不等待两个线程
condition variable not holding wait across two threads
我遇到了这个线程同步问题。
基本上,我正在写入一个输出缓冲区,并等待一个条件变量,直到读取缓冲区填充了来自套接字的响应。这是一个非常简单的线程同步。
def write_wait_response(self, buffer, timeout=30):
'''
Write and wait for response
Params:
Buffer BYTE encoded data
Timeout timeout to wait for response
Returns:
response str if successful
'''
self.buffer = buffer
if self.waitLock(timeout):
# condition var was signaled, we can return a response
readbuf = bytes(self.readbuffer)
self.readbuffer = b''
return readbuf
else:
print("AsyncClientSocket: No response recieved from {} in {} seconds, dumping buffer".format(
self.sa, timeout))
self.buffer = ''
raise TimeoutError("AsyncClientSocket Timed Out")
def handle_read(self):
self.readbuffer, address = self.recvfrom(2048)
print(self.readbuffer)
print("notifying")
self.cond.notifyAll()
看起来很简单,对吧?有 1 个线程在等待条件变量,还有 1 个线程(asyncore 异步回调循环)将填充 self.readbuffer 并通知条件变量。更好奇:如果我执行 time.sleep() 而不是使用条件变量,我会在 write_wait_response() 的调用线程上得到一个完美填充的 self.readbuffer。显然这不是我能接受的方案
这是我期待的结果:
- 调用write_wait_response(缓冲区),写入缓冲区并等待
在 self.cond
- asyncore 回调循环调用 handle_write,将字节写入套接字。
- 服务器接收字节,写入响应。
- asyncore 回调循环看到套接字上的字节,读入
self.readbuffer, 通知 cv
- ?????????? write_wait_response 应该解锁吗?
控制台输出:
waiting <- thread 1 waiting on CV
AsyncClientSocket: writing 5 bytes <- thread 2: handle_write
b'200,2' <- thread 2: that's the server response
notifying <- thread 2: that's handle_read attempting to notify the held CV
error: uncaptured python exception, closing channel <my_socket_stuff.AsyncClientSocket connected 127.0.0.1:50000 at 0x1051bf438> (<class 'RuntimeError'>:cannot notify on un-acquired lock
注意:在此日志的末尾,线程 1 仍在等待 self.cond。怎么回事?
完整 class:
class AsyncClientSocket(asyncore.dispatcher):
def __init__(self, socketargs):
asyncore.dispatcher.__init__(self)
family, type, proto, canonname, sa = socketargs
self.sa = sa
self.create_socket(family, type)
if type == socket.SOCK_STREAM:
self.connect( sa )
elif type == socket.SOCK_DGRAM:
pass
self.buffer = b''
self.lock = threading.Lock()
self.cond = threading.Condition(self.lock)
self.readbuffer = b''
def write_wait_response(self, buffer, timeout=30):
'''
Write and wait for response
Params:
Buffer BYTE encoded data
Timeout timeout to wait for response
Returns:
response str if successful
'''
self.buffer = buffer
if self.waitLock(timeout):
# condition var was signaled, we can return a response
readbuf = bytes(self.readbuffer)
self.readbuffer = b''
return readbuf
else:
print("AsyncClientSocket: No response recieved from {} in {} seconds, dumping buffer".format(
self.sa, timeout))
self.buffer = ''
raise TimeoutError("AsyncClientSocket Timed Out")
def waitLock(self, timeout):
'''
Wait for timeout seconds on CV
'''
try:
self.cond.acquire()
print("waiting")
return self.cond.wait(timeout)
finally:
self.cond.release()
def handle_connect(self):
pass
def handle_close(self):
self.close()
def handle_read(self):
self.readbuffer, address = self.recvfrom(2048)
print(self.readbuffer)
print("notifying")
self.cond.notifyAll()
def writable(self):
return (len(self.buffer) > 0)
def handle_write(self):
print("AsyncClientSocket: writing {} bytes".format(len(self.buffer)))
self.readbuffer = b''
sent = self.sendto(self.buffer, self.sa)
self.buffer = self.buffer[sent:]
想通了。这与 asyncore 无关。我只是错误地向条件变量发出信号。 The python3 threading api doc says the calling thread of notify() must acquire the underlying lock 这是有道理的,不希望两个生产者通知同一个条件变量。会希望一个在关键部分阻塞,而另一个执行其任务。
def handle_read(self):
try:
self.cond.acquire()
self.readbuffer, address = self.recvfrom(2048)
print(self.readbuffer)
self.cond.notify()
finally:
self.cond.release()
我遇到了这个线程同步问题。 基本上,我正在写入一个输出缓冲区,并等待一个条件变量,直到读取缓冲区填充了来自套接字的响应。这是一个非常简单的线程同步。
def write_wait_response(self, buffer, timeout=30):
'''
Write and wait for response
Params:
Buffer BYTE encoded data
Timeout timeout to wait for response
Returns:
response str if successful
'''
self.buffer = buffer
if self.waitLock(timeout):
# condition var was signaled, we can return a response
readbuf = bytes(self.readbuffer)
self.readbuffer = b''
return readbuf
else:
print("AsyncClientSocket: No response recieved from {} in {} seconds, dumping buffer".format(
self.sa, timeout))
self.buffer = ''
raise TimeoutError("AsyncClientSocket Timed Out")
def handle_read(self):
self.readbuffer, address = self.recvfrom(2048)
print(self.readbuffer)
print("notifying")
self.cond.notifyAll()
看起来很简单,对吧?有 1 个线程在等待条件变量,还有 1 个线程(asyncore 异步回调循环)将填充 self.readbuffer 并通知条件变量。更好奇:如果我执行 time.sleep() 而不是使用条件变量,我会在 write_wait_response() 的调用线程上得到一个完美填充的 self.readbuffer。显然这不是我能接受的方案
这是我期待的结果:
- 调用write_wait_response(缓冲区),写入缓冲区并等待 在 self.cond
- asyncore 回调循环调用 handle_write,将字节写入套接字。
- 服务器接收字节,写入响应。
- asyncore 回调循环看到套接字上的字节,读入 self.readbuffer, 通知 cv
- ?????????? write_wait_response 应该解锁吗?
控制台输出:
waiting <- thread 1 waiting on CV
AsyncClientSocket: writing 5 bytes <- thread 2: handle_write
b'200,2' <- thread 2: that's the server response
notifying <- thread 2: that's handle_read attempting to notify the held CV
error: uncaptured python exception, closing channel <my_socket_stuff.AsyncClientSocket connected 127.0.0.1:50000 at 0x1051bf438> (<class 'RuntimeError'>:cannot notify on un-acquired lock
注意:在此日志的末尾,线程 1 仍在等待 self.cond。怎么回事?
完整 class:
class AsyncClientSocket(asyncore.dispatcher):
def __init__(self, socketargs):
asyncore.dispatcher.__init__(self)
family, type, proto, canonname, sa = socketargs
self.sa = sa
self.create_socket(family, type)
if type == socket.SOCK_STREAM:
self.connect( sa )
elif type == socket.SOCK_DGRAM:
pass
self.buffer = b''
self.lock = threading.Lock()
self.cond = threading.Condition(self.lock)
self.readbuffer = b''
def write_wait_response(self, buffer, timeout=30):
'''
Write and wait for response
Params:
Buffer BYTE encoded data
Timeout timeout to wait for response
Returns:
response str if successful
'''
self.buffer = buffer
if self.waitLock(timeout):
# condition var was signaled, we can return a response
readbuf = bytes(self.readbuffer)
self.readbuffer = b''
return readbuf
else:
print("AsyncClientSocket: No response recieved from {} in {} seconds, dumping buffer".format(
self.sa, timeout))
self.buffer = ''
raise TimeoutError("AsyncClientSocket Timed Out")
def waitLock(self, timeout):
'''
Wait for timeout seconds on CV
'''
try:
self.cond.acquire()
print("waiting")
return self.cond.wait(timeout)
finally:
self.cond.release()
def handle_connect(self):
pass
def handle_close(self):
self.close()
def handle_read(self):
self.readbuffer, address = self.recvfrom(2048)
print(self.readbuffer)
print("notifying")
self.cond.notifyAll()
def writable(self):
return (len(self.buffer) > 0)
def handle_write(self):
print("AsyncClientSocket: writing {} bytes".format(len(self.buffer)))
self.readbuffer = b''
sent = self.sendto(self.buffer, self.sa)
self.buffer = self.buffer[sent:]
想通了。这与 asyncore 无关。我只是错误地向条件变量发出信号。 The python3 threading api doc says the calling thread of notify() must acquire the underlying lock 这是有道理的,不希望两个生产者通知同一个条件变量。会希望一个在关键部分阻塞,而另一个执行其任务。
def handle_read(self):
try:
self.cond.acquire()
self.readbuffer, address = self.recvfrom(2048)
print(self.readbuffer)
self.cond.notify()
finally:
self.cond.release()