Python asyncio - heartbeat() 方法不写入流
Python asyncio - heartbeat() method not writing to stream
我正在尝试使用 Python 3 asyncio 创建一个概念验证,实现一个定期发送 heartbeats 的客户端服务器以保持连接。
请注意,服务器只是一个回显服务器,不会关闭连接。但重要的是客户端能够定期发送心跳。
这是当前的实现:
stream_client.py
import asyncio
class StreamClient:
def __init__(self, heartbeat_int, loop=None):
self.heartbeat_int = heartbeat_int
if loop is not None:
self.loop = loop
else:
self.loop = asyncio.get_event_loop()
def start(self):
""" start manages the event loop, but it is not a coroutine """
self.loop.run_until_complete(self.get_client())
self.loop.create_task(self.start_timed_session())
msg = self.loop.run_until_complete(self.logon('hello'))
if msg == 'hello':
try:
self.loop.run_forever()
except KeyboardInterrupt:
print('Closing connection with server...')
self.writer.close()
self.loop.close()
else:
print('Logon unsuccessful, closing connection with server...')
self.writer.close()
self.loop.close()
@asyncio.coroutine
def get_client(self):
self.reader, self.writer = yield from asyncio.open_connection(
'127.0.0.1',
9871,
loop=self.loop
)
print('Connection established at "localhost:9871"')
@asyncio.coroutine
def timed_session(self):
yield from asyncio.sleep(self.heartbeat_int)
self.loop.create_task(self.start_timed_session())
@asyncio.coroutine
def start_timed_session(self):
heartbeat_task = self.loop.create_task(self.timed_session())
heartbeat_task.add_done_callback(self.heartbeat)
@asyncio.coroutine
def logon(self, msg):
print('Sending message:', msg)
self.writer.write(msg.encode())
data = yield from self.reader.read(15)
resp = data.decode()
print('Data received:', resp)
return resp
def heartbeat(self, fut):
"""
This is future's callback:
1) Can't be a coroutine
2) Takes a future as an argument
"""
print('> Sending heartbeat...')
self.writer.write('heartbeat'.encode())
# Start the client
client = StreamClient(5)
client.start()
stream_server.py
import asyncio
class StreamServer:
def __init__(self, loop=None):
if loop is not None:
self.loop = loop
else:
self.loop = asyncio.get_event_loop()
@asyncio.coroutine
def server_handler(self, reader, writer):
data = yield from reader.read(15)
msg = data.decode()
if data is not 'bye':
print('Received data: "{}" from {}'.format(msg, writer.get_extra_info('peername')))
print('Echoing the message...')
writer.write(data)
yield from writer.drain()
else:
print('Received data: "{}" from {}'.format(
data,
writer.get_extra_info('peername')
)
)
print('Closing the connection...')
writer.close()
loop = asyncio.get_event_loop()
stream_server = StreamServer(loop)
coro_server = asyncio.start_server(
stream_server.server_handler,
'127.0.0.1',
9871,
loop=stream_server.loop
)
server = loop.run_until_complete(coro_server)
print('Listening on:', server.sockets[0].getsockname())
try:
loop.run_forever()
except KeyboardInterrupt:
pass
print('Closing server')
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
问题:
在 heartbeat()
方法中,self.writer.write('heartbeat'.encode())
行似乎永远不会被执行。
我怎样才能让它工作?
代码正在客户端执行,您只是在服务器端没有任何代码来接收消息。 server_handler
每个连接 只调用一次 ,而不是每条消息调用一次。因此,如果您希望它能够从给定客户端接收无限数量的心跳,则需要在 server_handler
内部设置一个循环来接收它们:
@asyncio.coroutine
def server_handler(self, reader, writer):
while True: # Keep receiving data from client.
data = yield from reader.read(15)
msg = data.decode()
if data is not 'bye':
print('Received data: "{}" from {}'.format(msg, writer.get_extra_info('peername')))
print('Echoing the message...')
writer.write(data)
yield from writer.drain()
else:
print('Received data: "{}" from {}'.format(
data,
writer.get_extra_info('peername')
)
)
print('Closing the connection...')
writer.close()
我正在尝试使用 Python 3 asyncio 创建一个概念验证,实现一个定期发送 heartbeats 的客户端服务器以保持连接。
请注意,服务器只是一个回显服务器,不会关闭连接。但重要的是客户端能够定期发送心跳。
这是当前的实现:
stream_client.py
import asyncio
class StreamClient:
def __init__(self, heartbeat_int, loop=None):
self.heartbeat_int = heartbeat_int
if loop is not None:
self.loop = loop
else:
self.loop = asyncio.get_event_loop()
def start(self):
""" start manages the event loop, but it is not a coroutine """
self.loop.run_until_complete(self.get_client())
self.loop.create_task(self.start_timed_session())
msg = self.loop.run_until_complete(self.logon('hello'))
if msg == 'hello':
try:
self.loop.run_forever()
except KeyboardInterrupt:
print('Closing connection with server...')
self.writer.close()
self.loop.close()
else:
print('Logon unsuccessful, closing connection with server...')
self.writer.close()
self.loop.close()
@asyncio.coroutine
def get_client(self):
self.reader, self.writer = yield from asyncio.open_connection(
'127.0.0.1',
9871,
loop=self.loop
)
print('Connection established at "localhost:9871"')
@asyncio.coroutine
def timed_session(self):
yield from asyncio.sleep(self.heartbeat_int)
self.loop.create_task(self.start_timed_session())
@asyncio.coroutine
def start_timed_session(self):
heartbeat_task = self.loop.create_task(self.timed_session())
heartbeat_task.add_done_callback(self.heartbeat)
@asyncio.coroutine
def logon(self, msg):
print('Sending message:', msg)
self.writer.write(msg.encode())
data = yield from self.reader.read(15)
resp = data.decode()
print('Data received:', resp)
return resp
def heartbeat(self, fut):
"""
This is future's callback:
1) Can't be a coroutine
2) Takes a future as an argument
"""
print('> Sending heartbeat...')
self.writer.write('heartbeat'.encode())
# Start the client
client = StreamClient(5)
client.start()
stream_server.py
import asyncio
class StreamServer:
def __init__(self, loop=None):
if loop is not None:
self.loop = loop
else:
self.loop = asyncio.get_event_loop()
@asyncio.coroutine
def server_handler(self, reader, writer):
data = yield from reader.read(15)
msg = data.decode()
if data is not 'bye':
print('Received data: "{}" from {}'.format(msg, writer.get_extra_info('peername')))
print('Echoing the message...')
writer.write(data)
yield from writer.drain()
else:
print('Received data: "{}" from {}'.format(
data,
writer.get_extra_info('peername')
)
)
print('Closing the connection...')
writer.close()
loop = asyncio.get_event_loop()
stream_server = StreamServer(loop)
coro_server = asyncio.start_server(
stream_server.server_handler,
'127.0.0.1',
9871,
loop=stream_server.loop
)
server = loop.run_until_complete(coro_server)
print('Listening on:', server.sockets[0].getsockname())
try:
loop.run_forever()
except KeyboardInterrupt:
pass
print('Closing server')
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
问题:
在 heartbeat()
方法中,self.writer.write('heartbeat'.encode())
行似乎永远不会被执行。
我怎样才能让它工作?
代码正在客户端执行,您只是在服务器端没有任何代码来接收消息。 server_handler
每个连接 只调用一次 ,而不是每条消息调用一次。因此,如果您希望它能够从给定客户端接收无限数量的心跳,则需要在 server_handler
内部设置一个循环来接收它们:
@asyncio.coroutine
def server_handler(self, reader, writer):
while True: # Keep receiving data from client.
data = yield from reader.read(15)
msg = data.decode()
if data is not 'bye':
print('Received data: "{}" from {}'.format(msg, writer.get_extra_info('peername')))
print('Echoing the message...')
writer.write(data)
yield from writer.drain()
else:
print('Received data: "{}" from {}'.format(
data,
writer.get_extra_info('peername')
)
)
print('Closing the connection...')
writer.close()