ZeroMQ 在断开连接的对等点上进行循环故障转移

ZeroMQ round-robin fail-over on disconnected peers

我正在使用 ZeroMQ 的多重连接功能,将单个 DEALER 连接到 2 ROUTERS :

            +----> .Connect() --> ROUTER 1
           /
DEALER ---+------> .Connect() --> ROUTER 2

在我的测试中,我通过 DEALER 发送了 10 条消息。我向每个 ROUTER-s.

返回了 5 条消息的均匀分布

我的问题是,如果 ROUTER 1 由于某种原因消失了,DEALER 仍会继续为它排队消息,我认为假设 ROUTER 1 最终会到来背部。我在 ROUTER 2.

上只收到了 5 条消息

我需要做的是 DEALER 忽略断开连接或失败的对等点。这可能吗?

我尝试设置 ZMQ_SNDHWM 和许多其他设置,但似乎没有任何效果。

我能看到的唯一选择是自己进行故障转移,使用单独的套接字、心跳和 ACK 数据包等。看起来 ZeroMQ 应该已经实现了这样一个基本模式。


编辑: 测试代码

package main

import (
    "github.com/pebbe/zmq4"
    "time"
    "log"
    "fmt"
)

func receiveAll(sok *zmq4.Socket) (received int) {
    poller := zmq4.NewPoller()
    poller.Add(sok, zmq4.POLLIN)

    for {
        sockets, err := poller.Poll(100 * time.Millisecond)
        if err != nil {
            log.Print(err)
        }
        if len(sockets) > 0 {
            for _, s := range sockets {
                msg, _ := s.Socket.RecvMessageBytes(0)
                if string(msg[1]) != "Hello World" {
                    log.Fatalf("Unexpected message: %s\n", msg)
                }
                received ++
            }
        } else {
            return
        }
    }
}

func main() {

    dealer, _ := zmq4.NewSocket(zmq4.DEALER)
    router1, _ := zmq4.NewSocket(zmq4.ROUTER)
    router2, _ := zmq4.NewSocket(zmq4.ROUTER)

    router1.Bind("tcp://0.0.0.0:6667")
    router2.Bind("tcp://0.0.0.0:6668")

    dealer.Connect("tcp://0.0.0.0:6667")
    dealer.Connect("tcp://0.0.0.0:6668")

    router1.SetSubscribe("")
    router2.SetSubscribe("")
    dealer.SetSubscribe("")

    for i := 0; i < 10; i++ {
        dealer.SendBytes([]byte("Hello World"), 0)
    }

    time.Sleep(300 * time.Millisecond)

    count1 := receiveAll(router1)
    count2 := receiveAll(router2)

    fmt.Printf("Blue sky scenario: count1=%d count2=%d\n", count1, count2)

    // Shut down a peer
    router1.Close()
    time.Sleep(300 * time.Millisecond)

    for i := 0; i < 10; i++ {
        dealer.SendBytes([]byte("Hello World"), 0)
    }

    time.Sleep(300 * time.Millisecond)

    count := receiveAll(router2)

    fmt.Printf("Peer 1 offline: count=%d\n", count)

}

What I need to happen is for DEALER to ignore disconnected or failed peers. Is this possible ?

当然可以。需要调整默认(非活动)值,使用您在 :

中的用例特定设置
  • a .setsockopt( ZMQ.IMMEDIATE, 1 ) 用于不缓冲对等方的消息实例,这似乎不是 "alive"
  • a .setsockopt( ZMQ.HEARTBEAT_IVL, <ms> ) 用于发送心跳
  • a .setsockopt( ZMQ.HEARTBEAT_TTL, <ms> ) 用于生存时间设置
  • a .setsockopt( ZMQ.HEARTBEAT_TIMEOUT, <ms>) 超时阈值
  • a .setsockopt( ZMQ.HANDSHAKE_IVL, <ms> ) 用于管理(重新)建立超时。

有关详细信息,请检查您的语言绑定以及它实际使用的原生 API 版本。大多数这些设置自 native-API v 3.x 以来可用,最新的 native-API v 4.2.2 文档将帮助您调整值和配置策略。