PUB/SUB ZeroMQ 中的模式不工作

PUB/SUB pattern in ZeroMQ not working

我正在尝试使用 ZeroMQ 实现一个非常基本的 PUB/SUB 模式。我想要一个服务器(始终处于活动状态)向所有客户端广播消息(发布者)并且不关心连接的客户端。 如果客户端作为订阅者连接到此服务器,它应该会收到消息。

但是,我无法使用 PUB/SUB 发送消息。

在 Python 中将是:

# publisher (server.py)
import zmq

ctx = zmq.Context()
publisher = ctx.socket(zmq.PUB)
publisher.bind('tcp://127.0.0.1:9091')

while True:
    publisher.send_string("test")

# subscriber (client.py)
import zmq

ctx = zmq.Context()
subscriber = ctx.socket(zmq.SUB)
subscriber.connect('tcp://127.0.0.1:9091')

while True:
    msg = subscriber.recv_string()
    print msg

或者在 golang 中:

package main

import (
    "github.com/pebbe/zmq4"
    "log"
    "time"
)

func Listen(subscriber *zmq4.Socket) {
    for {
        s, err := subscriber.Recv(0)
        if err != nil {
            log.Println(err)
            continue
        }
        log.Println("rec", s)
    }
}

func main() {
    publisher, _ := zmq4.NewSocket(zmq4.PUB)
    defer publisher.Close()
    publisher.Bind("tcp://*:9090")

    subscriber, _ := zmq4.NewSocket(zmq4.SUB)
    defer subscriber.Close()
    subscriber.Connect("tcp://127.0.0.1:9090")

    go Listen(subscriber)
    for _ = range time.Tick(time.Second) {
        publisher.Send("test", 0)
        log.Println("send", "test")

    }
}

我是否误解了这种模式,或者我是否需要在连接时从客户端向服务器发送特定信号。我对golang版本感兴趣,只使用python版本进行测试。

我是不是误解了这个模式?是的,幸运的是你做到了。

ZeroMQ 原型被定义为代表某种行为。如前所述,PUSH-archetype AccessPoint 将每条消息推送到 "through" 所有到目前为止设置的通信渠道,PULL-er AccessPoint 将任何已经到达的消息拉到 "it's hands", PUB-lisher AccessPoint 发布,SUB-scriber AccessPoint 订阅,以便仅接收与其主题过滤器匹配的消息,而不接收任何其他消息。

很明显,这样的 Archetype "specification" 有助于构建 ZeroMQ 智能消息传递/信号基础设施,以便于我们在 架构中使用。


# subscriber (client.py)
import zmq

ctx        = zmq.Context()
subscriber = ctx.socket( zmq.SUB )
subscriber.connect( 'tcp://127.0.0.1:9091' )
subscriber.setsockopt( zmq.LINGER,    0 )         # ALWAYS:
subscriber.setsockopt( zmq.SUBSCRIBE, "" )        # OTHERWISE NOTHING DELIVERED

while True:
    msg = subscriber.recv_string()                # MAY USE .poll() + zmq.NOBLOCK
    print msg

subscriber, _ := zmq4.NewSocket( zmq4.SUB )
subscriber.Connect(             "tcp://127.0.0.1:9090" )
subscriber.SetSubscribe(         filter )                 // SET: <topic-filter>

subscriber.SetLinger(            0 )    //  SAFETY FIRST: PREVENT DEADLOCK
defer subscriber.Close()                //  NOW MAY SAFELY SET:

...
msg, _ := subscriber.Recv( 0 )

根据定义,任何实例化 SUB 端 AccessPoint 对象的权利实际上是零机会知道的,哪些消息的选择是那些正确的, 所以它们应该是 "delivered" 而什么消息不是。

在没有这一初始知识的情况下,ZeroMQ 设计者的主要选择是保持 原型策略一致,并让PUB 端的 AccessNode 分发所有 .send()-仅向那些SUB-侧访问节点获取消息,这些访问节点已明确请求通过zmq.SUBSCRIBE-机制或[=83接收任何此类消息=] 将从 PUB 发送的所有内容也发送给所有尚未决定的 SUB-s.

前者是 ZeroMQ 作者一致且专业的设计步骤。
后者实际上意味着违反 ZeroMQ 自己的 RFC 规范。

后一种选择就像一个人刚搬到新公寓,很难期望从第二天早上起所有报纸和杂志都会出现在他的新邮箱中,对吧?但是,如果订阅了波士顿环球报,第二天早上,新版本将在门口,因为它会一直在那里,直到有人取消订阅或报纸破产或缺少纸卷导致印刷厂无法交付在适当的时候和时尚或 Big Dig 隧道的交通拥堵可能会在某一天对所有人或仅对本地交付造成麻烦。

这一切都是自然而然的,并且符合原型政策。

插曲:Golang 已经绑定到许多不同的 API 版本
技术纯粹主义者会在这里反对,早期的 API 版本(直到某些 v3.2+)实际上确实在技术上将所有消息有效负载从 PUB 传输到所有 SUB-s,因为它简化了PUB 端的工作负载包络,但增加了传输 class(es) 数据流和 SUB 端的资源/延迟的主题过滤器处理。然而,所有这些都隐藏在用户代码之外,就在 API 抽象范围内。因此,除了需要适当扩展资源外,这对用户来说是透明的。最近的 API 版本恢复了主题过滤器处理器的角色,并让它现在发生在 PUB 端。然而,在这两种情况下,ZeroMQ RFC 规范策略都是以这样的方式实现的,SUB-side 永远不会传递(通过 .recv()-interface)一条与有效不匹配的消息, 显式 SUB 端订阅

在所有情况下,SUB 方尚未明确设置任何 zmq.SUBSCRIBE 指示的主题过滤器,它不能也不会传递任何内容 (这与为 SUB 类型的 AccessPoint 定义的 ZeroMQ RFC 原型策略既自然又完全一致)。

最佳下一步:

总是,至少,阅读 ZeroMQ API 文档,其中所有细节都是专业指定的 - 所以至少,人们可以第一眼看到智能消息传递/信号框架的气息。

这不会帮助任何人在全新领域开始并完全构建自己复杂的心理概念和深入理解所有事物的内部运作方式,这显然不是任何 API-文档的野心,是吗?然而,这将帮助任何人刷新或提醒所有可配置的细节,一旦掌握了 ZeroMQ 内部架构,如源代码中详述,在下一段中提到。

另外,对于任何确实对 或只是 本身感兴趣的人来说,花时间和精力阅读 Pieter HINTJENS 的文章是值得的书 "Code Connected, Volume 1"(以 pdf 形式免费提供)加上他以后关于他在软件工程方面丰富经验的任何其他书籍,因为他对现代计算的许多见解可能并且将会启发(和很多)。

编辑:

GO 中的 MWE

package main

import (
    "github.com/pebbe/zmq4"
    "log"
    "time"
)

func Listen(subscriber *zmq4.Socket) {
    for {
        s, err := subscriber.Recv(0)
        if err != nil {
            log.Println(err)
            continue
        }
        log.Println("rec", s)
    }
}

func main() {
    publisher, _ := zmq4.NewSocket(zmq4.PUB)
    publisher.SetLinger(0)
    defer publisher.Close()

    publisher.Bind("tcp://127.0.0.1:9092")

    subscriber, _ := zmq4.NewSocket(zmq4.SUB)
    subscriber.SetLinger(0)
    defer subscriber.Close()

    subscriber.Connect("tcp://127.0.0.1:9092")
    subscriber.SetSubscribe("")

    go Listen(subscriber)
    for _ = range time.Tick(time.Second) {
        publisher.Send("test", 0)
        log.Println("send", "test")
    }
}