python 当 运行 来自 class 时 nats 连接错误

python nats connection error when running from a class

我正在尝试 运行 一个在继续之前等待特定 nats 消息的应用程序。所以我做了以下 class 发送消息并收听消息:

#!/usr/bin/python3
from nats.aio.client import Client as NATS
import asyncio


class NatsService:

    def __init__(self):
        self.NATSERVER = "XXXXX"
        self.CHANNEL = "msgchannel"
        self.CONNECTTIMEOUT = 1
        self.MAXRECONNECTATTEMPTS = 1
        self.RECONNECTTIMEWAIT = 1
        self.nc = NATS()

    async def send_message(self, message, channel=None):
        if not channel:
            channel = self.CHANNEL

        print("connecting to nats server")
        await self.nc.connect(self.NATSERVER, self.CONNECTTIMEOUT,
                              max_reconnect_attempts=self.MAXRECONNECTATTEMPTS,
                              reconnect_time_wait=self.RECONNECTTIMEWAIT)

        print(f"Publishing message: '{message}' to channel: {channel}")
        await self.nc.publish(channel, message.encode('utf-8'))
        print("message sent, closing connection")
        await self.nc.close()
        print("nats server connection closed")

    def start_listening(self):
        loop = asyncio.get_event_loop()
        try:
            loop.create_task(self.listener_loop(loop))
            loop.run_forever()
        finally:
            loop.close()

    async def listener_loop(self, loop):
        print("connecting to nats listener loop")
        await self.nc.connect(self.NATSERVER, loop=loop)

        async def message_handler(msg):
            subject = msg.subject
            data = msg.data.decode()
            print('Received a message on {}: {}'.format(subject, data))

            if eval(data.split(":::")[1]):
                print("message received, closing")
                await nc.drain()    # timeout occurs for some reason
                print("stopping loop")
                loop.stop()
            
        await self.nc.subscribe(self.CHANNEL, cb=msg_handler)

我正在将这个 class 导入到两个应用程序中,一个应该发送消息,另一个应该监听这些消息直到收到正确的消息。

我的主应用程序侦听消息,只有在收到正确的消息后才会继续

from nats_service import NatsService

try:
    print("starting nats service instance")
    ns = NatsService()
    print("listening for approved message")
    start_listening()
except Exception as e:
    print(f"Error: {e}")

print(f"Contiuing with application...")

另一个应用程序用于发送消息:

from nats_service import NatsService
import asyncio

async def main():
    ns = NatsService()
    message = "test"
    await ns.send_message(message)

if __name__=="__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    print(f"Completed sender function.")

我能够在将它们组合在一起之前创建的独立函数上发送和接收消息。但是当从上面的 class 导入它们时,我似乎无法 运行 它们,运行 尤其是 asyncio 问题。

经过反复试验,当我 运行 发件人时,它似乎终于开始了,但立即失败并出现一个我不明白的错误,也找不到太多关于以下内容的信息:

connecting to nats server
nats: encountered error
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/nats/aio/client.py", line 1185, in _select_next_server
    connection_future, self.options['connect_timeout']
  File "/usr/lib/python3.7/asyncio/tasks.py", line 416, in wait_for
    return fut.result()
  File "/usr/lib/python3.7/asyncio/streams.py", line 75, in open_connection
    protocol = StreamReaderProtocol(reader, loop=loop)
  File "/usr/lib/python3.7/asyncio/streams.py", line 227, in __init__
    self._closed = self._loop.create_future()
AttributeError: 'int' object has no attribute 'create_future'
Traceback (most recent call last):
  File "sender_side.py", line 13, in <module>
    loop.run_until_complete(main())
  File "/usr/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "sender_side.py", line 9, in main
    await ns.send_message(message)
  File "/home/bot10-sigma/nats_tests/nats_service.py", line 23, in send_message
    reconnect_time_wait=self.RECONNECTTIMEWAIT)
  File "/usr/local/lib/python3.7/dist-packages/nats/aio/client.py", line 317, in connect
    await self._select_next_server()
  File "/usr/local/lib/python3.7/dist-packages/nats/aio/client.py", line 1174, in _select_next_server
    self.options["reconnect_time_wait"], loop=self._loop
  File "/usr/lib/python3.7/asyncio/tasks.py", line 563, in sleep
    future = loop.create_future()
AttributeError: 'int' object has no attribute 'create_future'
Exception ignored in: <function StreamReaderProtocol.__del__ at 0x761cacd8>
Traceback (most recent call last):
  File "/usr/lib/python3.7/asyncio/streams.py", line 271, in __del__
AttributeError: 'StreamReaderProtocol' object has no attribute '_closed'

您正在将 self.CONNECTTIMEOUT 作为位置参数传递给 Client.connect,它希望对其 io_loop 参数进行引用。这就是为什么你得到 AttributeErrorint 没有 create_future 属性。将超时作为 connect_timeout=self.CONNECTTIMEOUT 传递,此问题应该会消失。