python 当 运行 来自 class 时 nats 连接错误
python nats connection error when running from a class
我正在尝试 运行 一个在继续之前等待特定 nats 消息的应用程序。所以我做了以下 class 发送消息并收听消息:
#!/usr/bin/python3
from nats.aio.client import Client as NATS
import asyncio
class NatsService:
def __init__(self):
self.NATSERVER = "XXXXX"
self.CHANNEL = "msgchannel"
self.CONNECTTIMEOUT = 1
self.MAXRECONNECTATTEMPTS = 1
self.RECONNECTTIMEWAIT = 1
self.nc = NATS()
async def send_message(self, message, channel=None):
if not channel:
channel = self.CHANNEL
print("connecting to nats server")
await self.nc.connect(self.NATSERVER, self.CONNECTTIMEOUT,
max_reconnect_attempts=self.MAXRECONNECTATTEMPTS,
reconnect_time_wait=self.RECONNECTTIMEWAIT)
print(f"Publishing message: '{message}' to channel: {channel}")
await self.nc.publish(channel, message.encode('utf-8'))
print("message sent, closing connection")
await self.nc.close()
print("nats server connection closed")
def start_listening(self):
loop = asyncio.get_event_loop()
try:
loop.create_task(self.listener_loop(loop))
loop.run_forever()
finally:
loop.close()
async def listener_loop(self, loop):
print("connecting to nats listener loop")
await self.nc.connect(self.NATSERVER, loop=loop)
async def message_handler(msg):
subject = msg.subject
data = msg.data.decode()
print('Received a message on {}: {}'.format(subject, data))
if eval(data.split(":::")[1]):
print("message received, closing")
await nc.drain() # timeout occurs for some reason
print("stopping loop")
loop.stop()
await self.nc.subscribe(self.CHANNEL, cb=msg_handler)
我正在将这个 class 导入到两个应用程序中,一个应该发送消息,另一个应该监听这些消息直到收到正确的消息。
我的主应用程序侦听消息,只有在收到正确的消息后才会继续
from nats_service import NatsService
try:
print("starting nats service instance")
ns = NatsService()
print("listening for approved message")
start_listening()
except Exception as e:
print(f"Error: {e}")
print(f"Contiuing with application...")
另一个应用程序用于发送消息:
from nats_service import NatsService
import asyncio
async def main():
ns = NatsService()
message = "test"
await ns.send_message(message)
if __name__=="__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print(f"Completed sender function.")
我能够在将它们组合在一起之前创建的独立函数上发送和接收消息。但是当从上面的 class 导入它们时,我似乎无法 运行 它们,运行 尤其是 asyncio 问题。
经过反复试验,当我 运行 发件人时,它似乎终于开始了,但立即失败并出现一个我不明白的错误,也找不到太多关于以下内容的信息:
connecting to nats server
nats: encountered error
Traceback (most recent call last):
File "/usr/local/lib/python3.7/dist-packages/nats/aio/client.py", line 1185, in _select_next_server
connection_future, self.options['connect_timeout']
File "/usr/lib/python3.7/asyncio/tasks.py", line 416, in wait_for
return fut.result()
File "/usr/lib/python3.7/asyncio/streams.py", line 75, in open_connection
protocol = StreamReaderProtocol(reader, loop=loop)
File "/usr/lib/python3.7/asyncio/streams.py", line 227, in __init__
self._closed = self._loop.create_future()
AttributeError: 'int' object has no attribute 'create_future'
Traceback (most recent call last):
File "sender_side.py", line 13, in <module>
loop.run_until_complete(main())
File "/usr/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
return future.result()
File "sender_side.py", line 9, in main
await ns.send_message(message)
File "/home/bot10-sigma/nats_tests/nats_service.py", line 23, in send_message
reconnect_time_wait=self.RECONNECTTIMEWAIT)
File "/usr/local/lib/python3.7/dist-packages/nats/aio/client.py", line 317, in connect
await self._select_next_server()
File "/usr/local/lib/python3.7/dist-packages/nats/aio/client.py", line 1174, in _select_next_server
self.options["reconnect_time_wait"], loop=self._loop
File "/usr/lib/python3.7/asyncio/tasks.py", line 563, in sleep
future = loop.create_future()
AttributeError: 'int' object has no attribute 'create_future'
Exception ignored in: <function StreamReaderProtocol.__del__ at 0x761cacd8>
Traceback (most recent call last):
File "/usr/lib/python3.7/asyncio/streams.py", line 271, in __del__
AttributeError: 'StreamReaderProtocol' object has no attribute '_closed'
您正在将 self.CONNECTTIMEOUT
作为位置参数传递给 Client.connect
,它希望对其 io_loop
参数进行引用。这就是为什么你得到 AttributeError
:int
没有 create_future
属性。将超时作为 connect_timeout=self.CONNECTTIMEOUT
传递,此问题应该会消失。
我正在尝试 运行 一个在继续之前等待特定 nats 消息的应用程序。所以我做了以下 class 发送消息并收听消息:
#!/usr/bin/python3
from nats.aio.client import Client as NATS
import asyncio
class NatsService:
def __init__(self):
self.NATSERVER = "XXXXX"
self.CHANNEL = "msgchannel"
self.CONNECTTIMEOUT = 1
self.MAXRECONNECTATTEMPTS = 1
self.RECONNECTTIMEWAIT = 1
self.nc = NATS()
async def send_message(self, message, channel=None):
if not channel:
channel = self.CHANNEL
print("connecting to nats server")
await self.nc.connect(self.NATSERVER, self.CONNECTTIMEOUT,
max_reconnect_attempts=self.MAXRECONNECTATTEMPTS,
reconnect_time_wait=self.RECONNECTTIMEWAIT)
print(f"Publishing message: '{message}' to channel: {channel}")
await self.nc.publish(channel, message.encode('utf-8'))
print("message sent, closing connection")
await self.nc.close()
print("nats server connection closed")
def start_listening(self):
loop = asyncio.get_event_loop()
try:
loop.create_task(self.listener_loop(loop))
loop.run_forever()
finally:
loop.close()
async def listener_loop(self, loop):
print("connecting to nats listener loop")
await self.nc.connect(self.NATSERVER, loop=loop)
async def message_handler(msg):
subject = msg.subject
data = msg.data.decode()
print('Received a message on {}: {}'.format(subject, data))
if eval(data.split(":::")[1]):
print("message received, closing")
await nc.drain() # timeout occurs for some reason
print("stopping loop")
loop.stop()
await self.nc.subscribe(self.CHANNEL, cb=msg_handler)
我正在将这个 class 导入到两个应用程序中,一个应该发送消息,另一个应该监听这些消息直到收到正确的消息。
我的主应用程序侦听消息,只有在收到正确的消息后才会继续
from nats_service import NatsService
try:
print("starting nats service instance")
ns = NatsService()
print("listening for approved message")
start_listening()
except Exception as e:
print(f"Error: {e}")
print(f"Contiuing with application...")
另一个应用程序用于发送消息:
from nats_service import NatsService
import asyncio
async def main():
ns = NatsService()
message = "test"
await ns.send_message(message)
if __name__=="__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print(f"Completed sender function.")
我能够在将它们组合在一起之前创建的独立函数上发送和接收消息。但是当从上面的 class 导入它们时,我似乎无法 运行 它们,运行 尤其是 asyncio 问题。
经过反复试验,当我 运行 发件人时,它似乎终于开始了,但立即失败并出现一个我不明白的错误,也找不到太多关于以下内容的信息:
connecting to nats server
nats: encountered error
Traceback (most recent call last):
File "/usr/local/lib/python3.7/dist-packages/nats/aio/client.py", line 1185, in _select_next_server
connection_future, self.options['connect_timeout']
File "/usr/lib/python3.7/asyncio/tasks.py", line 416, in wait_for
return fut.result()
File "/usr/lib/python3.7/asyncio/streams.py", line 75, in open_connection
protocol = StreamReaderProtocol(reader, loop=loop)
File "/usr/lib/python3.7/asyncio/streams.py", line 227, in __init__
self._closed = self._loop.create_future()
AttributeError: 'int' object has no attribute 'create_future'
Traceback (most recent call last):
File "sender_side.py", line 13, in <module>
loop.run_until_complete(main())
File "/usr/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
return future.result()
File "sender_side.py", line 9, in main
await ns.send_message(message)
File "/home/bot10-sigma/nats_tests/nats_service.py", line 23, in send_message
reconnect_time_wait=self.RECONNECTTIMEWAIT)
File "/usr/local/lib/python3.7/dist-packages/nats/aio/client.py", line 317, in connect
await self._select_next_server()
File "/usr/local/lib/python3.7/dist-packages/nats/aio/client.py", line 1174, in _select_next_server
self.options["reconnect_time_wait"], loop=self._loop
File "/usr/lib/python3.7/asyncio/tasks.py", line 563, in sleep
future = loop.create_future()
AttributeError: 'int' object has no attribute 'create_future'
Exception ignored in: <function StreamReaderProtocol.__del__ at 0x761cacd8>
Traceback (most recent call last):
File "/usr/lib/python3.7/asyncio/streams.py", line 271, in __del__
AttributeError: 'StreamReaderProtocol' object has no attribute '_closed'
您正在将 self.CONNECTTIMEOUT
作为位置参数传递给 Client.connect
,它希望对其 io_loop
参数进行引用。这就是为什么你得到 AttributeError
:int
没有 create_future
属性。将超时作为 connect_timeout=self.CONNECTTIMEOUT
传递,此问题应该会消失。