Python asyncio - heartbeat() 方法不写入流

Python asyncio - heartbeat() method not writing to stream

我正在尝试使用 Python 3 asyncio 创建一个概念验证,实现一个定期发送 heartbeats 的客户端服务器以保持连接。

请注意,服务器只是一个回显服务器,不会关闭连接。但重要的是客户端能够定期发送心跳。

这是当前的实现:

stream_client.py

import asyncio


class StreamClient:

    def __init__(self, heartbeat_int, loop=None):
        self.heartbeat_int = heartbeat_int

        if loop is not None:
            self.loop = loop
        else:
            self.loop = asyncio.get_event_loop()

    def start(self):
        """ start manages the event loop, but it is not a coroutine """

        self.loop.run_until_complete(self.get_client())
        self.loop.create_task(self.start_timed_session())
        msg = self.loop.run_until_complete(self.logon('hello'))

        if msg == 'hello':
            try:
                self.loop.run_forever()
            except KeyboardInterrupt:
                print('Closing connection with server...')
                self.writer.close()
                self.loop.close()
        else:
            print('Logon unsuccessful, closing connection with server...')
            self.writer.close()
            self.loop.close()

    @asyncio.coroutine
    def get_client(self):
        self.reader, self.writer = yield from asyncio.open_connection(
            '127.0.0.1',
            9871,
            loop=self.loop
        )
        print('Connection established at "localhost:9871"')

    @asyncio.coroutine
    def timed_session(self):
        yield from asyncio.sleep(self.heartbeat_int)
        self.loop.create_task(self.start_timed_session())

    @asyncio.coroutine
    def start_timed_session(self):
        heartbeat_task = self.loop.create_task(self.timed_session())
        heartbeat_task.add_done_callback(self.heartbeat)

    @asyncio.coroutine
    def logon(self, msg):
        print('Sending message:', msg)
        self.writer.write(msg.encode())
        data = yield from self.reader.read(15)
        resp = data.decode()
        print('Data received:', resp)
        return resp

    def heartbeat(self, fut):
        """
        This is future's callback:
            1) Can't be a coroutine
            2) Takes a future as an argument
        """

        print('> Sending heartbeat...')
        self.writer.write('heartbeat'.encode())


# Start the client

client = StreamClient(5)
client.start()

stream_server.py

import asyncio


class StreamServer:

    def __init__(self, loop=None):
        if loop is not None:
            self.loop = loop
        else:
            self.loop = asyncio.get_event_loop()

    @asyncio.coroutine
    def server_handler(self, reader, writer):
        data = yield from reader.read(15)
        msg = data.decode()
        if data is not 'bye':
            print('Received data: "{}" from {}'.format(msg, writer.get_extra_info('peername')))
            print('Echoing the message...')
            writer.write(data)
            yield from writer.drain()
        else:
            print('Received data: "{}" from {}'.format(
                    data,
                    writer.get_extra_info('peername')
                )
            )
            print('Closing the connection...')
            writer.close()


loop = asyncio.get_event_loop()
stream_server = StreamServer(loop)

coro_server = asyncio.start_server(
    stream_server.server_handler,
    '127.0.0.1',
    9871,
    loop=stream_server.loop
)

server = loop.run_until_complete(coro_server)

print('Listening on:', server.sockets[0].getsockname())

try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

print('Closing server')
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

问题:

heartbeat() 方法中,self.writer.write('heartbeat'.encode()) 行似乎永远不会被执行。

我怎样才能让它工作?

代码正在客户端执行,您只是在服务器端没有任何代码来接收消息。 server_handler 每个连接 只调用一次 ,而不是每条消息调用一次。因此,如果您希望它能够从给定客户端接收无限数量的心跳,则需要在 server_handler 内部设置一个循环来接收它们:

@asyncio.coroutine
def server_handler(self, reader, writer):
    while True:  # Keep receiving data from client.
        data = yield from reader.read(15)
        msg = data.decode()
        if data is not 'bye':
            print('Received data: "{}" from {}'.format(msg, writer.get_extra_info('peername')))
            print('Echoing the message...')
            writer.write(data)
            yield from writer.drain()
        else:
            print('Received data: "{}" from {}'.format(
                    data,
                    writer.get_extra_info('peername')
                )
            )
            print('Closing the connection...')
            writer.close()