发布-订阅和消息总线 Python

Publish-Subscribe and a Message bus Python

我正在尝试为我的一些 python 模块创建中央日志记录系统。 我希望能够从多个带有日志的模块发送消息,然后中央记录器获取它们并进行处理。

为简单起见,我希望我的模块 A 看起来像这样:

  bus = connect_to_a_bus_that_is_always_there
  while True:
    #Publish a message to message bus, pseudo code
    bus.publish(topic="logs.a", message="example")
    time sleep(1)

和记录器(唯一的订阅者)

def actOnNewMessage(msg):
  if msg.topic.subtopic == "a":
     doSomethingForA(msg.data)

bus = connect_to_a_bus_that_is_always_there
bus.subscribe("logs", handler=actOnNewMessage)

while True:
  #wait for messages

现在 Logger 模块就像一个库,所以它不是永久性的 运行,所以也许我可以在 Logger 和 Message Bus 之间引入一些东西,它会不断地监视新消息。

我看过PyPubSub,但它似乎没有在文档中介绍不同运行ning python 模块之间的持续通信。如果有人试过这个,如果我可以在不同的模块之间使用它,它对我有用。

另一个问题是我最终可能会得到未用 python 编写的模块,所以我真的不想在模块 A、B 和 Logger 之间进行直接通信。 最后我的架构可能是这样的:

希望以上信息不会造成混淆。

tl;dr: Publish-Subscribe with persistent message bus in python and a subscriber that is constantly waiting for new messages. Any ready-to-use solution?

编辑: 我正在考虑 运行 设置一个知道 Logger 模块的 web 套接字服务器,而其他模块 A、B 知道 websocket 的地址。这种设计有什么缺点吗?

您可以将 redis 作为代理,并在单独的进程中 运行 logger.py。

logger.py

import redis

r = redis.Redis()

while True:
    next_log_item = r.blpop(['logs'], 0)
    write_to_db(next_log_item)

a.py

import redis
import time

r = redis.Redis()

while True:
    r.rpush('logs', message)
    time.sleep(1)

Opensplice 是一种消息总线,允许持久的缓冲数据通信。 不要推出自己的消息总线!它们是复杂的野兽。

为什么不简单地使用系统日志?有些版本的 syslog 还支持从多个节点到中央收集点的日志记录。许多编程语言都支持它,包括 python.

强烈 推荐您使用标准的python 日志框架。它允许您使用各种标准记录器(例如 SyslogHandler、SocketHandler 和 DatagramHandler)选择日志的去向。

它甚至允许您编写自己的处理程序,如果您必须...

我遇到过 nanomsg。非常适合我的需求,拥有 MIT 许可且无需额外的服务器 运行。此外,还有我想使用的任何语言的绑定。

from nanomsg import Socket, PUB

s = Socket(PUB)
s.connect('tcp://localhost:8080')
s.send('topicMessage')

from nanomsg import Socket, SUB

s = Socket(SUB)
s.connect('tcp://localhost:8080')
s.set_string_option(SUB, SUB_SUBSCRIBE, "topic")
while True:
    print(s.recv())