发布-订阅和消息总线 Python
Publish-Subscribe and a Message bus Python
我正在尝试为我的一些 python 模块创建中央日志记录系统。
我希望能够从多个带有日志的模块发送消息,然后中央记录器获取它们并进行处理。
为简单起见,我希望我的模块 A 看起来像这样:
bus = connect_to_a_bus_that_is_always_there
while True:
#Publish a message to message bus, pseudo code
bus.publish(topic="logs.a", message="example")
time sleep(1)
和记录器(唯一的订阅者)
def actOnNewMessage(msg):
if msg.topic.subtopic == "a":
doSomethingForA(msg.data)
bus = connect_to_a_bus_that_is_always_there
bus.subscribe("logs", handler=actOnNewMessage)
while True:
#wait for messages
现在 Logger 模块就像一个库,所以它不是永久性的 运行,所以也许我可以在 Logger 和 Message Bus 之间引入一些东西,它会不断地监视新消息。
我看过PyPubSub,但它似乎没有在文档中介绍不同运行ning python 模块之间的持续通信。如果有人试过这个,如果我可以在不同的模块之间使用它,它对我有用。
另一个问题是我最终可能会得到未用 python 编写的模块,所以我真的不想在模块 A、B 和 Logger 之间进行直接通信。
最后我的架构可能是这样的:
希望以上信息不会造成混淆。
tl;dr: Publish-Subscribe with persistent message bus in python and a
subscriber that is constantly waiting for new messages. Any ready-to-use solution?
编辑:
我正在考虑 运行 设置一个知道 Logger 模块的 web 套接字服务器,而其他模块 A、B 知道 websocket 的地址。这种设计有什么缺点吗?
您可以将 redis 作为代理,并在单独的进程中 运行 logger.py。
logger.py
import redis
r = redis.Redis()
while True:
next_log_item = r.blpop(['logs'], 0)
write_to_db(next_log_item)
a.py
import redis
import time
r = redis.Redis()
while True:
r.rpush('logs', message)
time.sleep(1)
Opensplice 是一种消息总线,允许持久的缓冲数据通信。 不要推出自己的消息总线!它们是复杂的野兽。
为什么不简单地使用系统日志?有些版本的 syslog 还支持从多个节点到中央收集点的日志记录。许多编程语言都支持它,包括 python.
我强烈 推荐您使用标准的python 日志框架。它允许您使用各种标准记录器(例如 SyslogHandler、SocketHandler 和 DatagramHandler)选择日志的去向。
它甚至允许您编写自己的处理程序,如果您必须...
我遇到过 nanomsg。非常适合我的需求,拥有 MIT 许可且无需额外的服务器 运行。此外,还有我想使用的任何语言的绑定。
from nanomsg import Socket, PUB
s = Socket(PUB)
s.connect('tcp://localhost:8080')
s.send('topicMessage')
from nanomsg import Socket, SUB
s = Socket(SUB)
s.connect('tcp://localhost:8080')
s.set_string_option(SUB, SUB_SUBSCRIBE, "topic")
while True:
print(s.recv())
我正在尝试为我的一些 python 模块创建中央日志记录系统。 我希望能够从多个带有日志的模块发送消息,然后中央记录器获取它们并进行处理。
为简单起见,我希望我的模块 A 看起来像这样:
bus = connect_to_a_bus_that_is_always_there
while True:
#Publish a message to message bus, pseudo code
bus.publish(topic="logs.a", message="example")
time sleep(1)
和记录器(唯一的订阅者)
def actOnNewMessage(msg):
if msg.topic.subtopic == "a":
doSomethingForA(msg.data)
bus = connect_to_a_bus_that_is_always_there
bus.subscribe("logs", handler=actOnNewMessage)
while True:
#wait for messages
现在 Logger 模块就像一个库,所以它不是永久性的 运行,所以也许我可以在 Logger 和 Message Bus 之间引入一些东西,它会不断地监视新消息。
我看过PyPubSub,但它似乎没有在文档中介绍不同运行ning python 模块之间的持续通信。如果有人试过这个,如果我可以在不同的模块之间使用它,它对我有用。
另一个问题是我最终可能会得到未用 python 编写的模块,所以我真的不想在模块 A、B 和 Logger 之间进行直接通信。
最后我的架构可能是这样的:
希望以上信息不会造成混淆。
tl;dr: Publish-Subscribe with persistent message bus in python and a subscriber that is constantly waiting for new messages. Any ready-to-use solution?
编辑: 我正在考虑 运行 设置一个知道 Logger 模块的 web 套接字服务器,而其他模块 A、B 知道 websocket 的地址。这种设计有什么缺点吗?
您可以将 redis 作为代理,并在单独的进程中 运行 logger.py。
logger.py
import redis
r = redis.Redis()
while True:
next_log_item = r.blpop(['logs'], 0)
write_to_db(next_log_item)
a.py
import redis
import time
r = redis.Redis()
while True:
r.rpush('logs', message)
time.sleep(1)
Opensplice 是一种消息总线,允许持久的缓冲数据通信。 不要推出自己的消息总线!它们是复杂的野兽。
为什么不简单地使用系统日志?有些版本的 syslog 还支持从多个节点到中央收集点的日志记录。许多编程语言都支持它,包括 python.
我强烈 推荐您使用标准的python 日志框架。它允许您使用各种标准记录器(例如 SyslogHandler、SocketHandler 和 DatagramHandler)选择日志的去向。
它甚至允许您编写自己的处理程序,如果您必须...
我遇到过 nanomsg。非常适合我的需求,拥有 MIT 许可且无需额外的服务器 运行。此外,还有我想使用的任何语言的绑定。
from nanomsg import Socket, PUB
s = Socket(PUB)
s.connect('tcp://localhost:8080')
s.send('topicMessage')
from nanomsg import Socket, SUB
s = Socket(SUB)
s.connect('tcp://localhost:8080')
s.set_string_option(SUB, SUB_SUBSCRIBE, "topic")
while True:
print(s.recv())