ZeroMQ 在断开连接的对等点上进行循环故障转移
ZeroMQ round-robin fail-over on disconnected peers
我正在使用 ZeroMQ 的多重连接功能,将单个 DEALER
连接到 2 ROUTERS
:
+----> .Connect() --> ROUTER 1
/
DEALER ---+------> .Connect() --> ROUTER 2
在我的测试中,我通过 DEALER
发送了 10 条消息。我向每个 ROUTER
-s.
返回了 5 条消息的均匀分布
我的问题是,如果 ROUTER 1
由于某种原因消失了,DEALER
仍会继续为它排队消息,我认为假设 ROUTER 1
最终会到来背部。我在 ROUTER 2
.
上只收到了 5 条消息
我需要做的是 DEALER
忽略断开连接或失败的对等点。这可能吗?
我尝试设置 ZMQ_SNDHWM
和许多其他设置,但似乎没有任何效果。
我能看到的唯一选择是自己进行故障转移,使用单独的套接字、心跳和 ACK 数据包等。看起来 ZeroMQ 应该已经实现了这样一个基本模式。
编辑: 测试代码
package main
import (
"github.com/pebbe/zmq4"
"time"
"log"
"fmt"
)
func receiveAll(sok *zmq4.Socket) (received int) {
poller := zmq4.NewPoller()
poller.Add(sok, zmq4.POLLIN)
for {
sockets, err := poller.Poll(100 * time.Millisecond)
if err != nil {
log.Print(err)
}
if len(sockets) > 0 {
for _, s := range sockets {
msg, _ := s.Socket.RecvMessageBytes(0)
if string(msg[1]) != "Hello World" {
log.Fatalf("Unexpected message: %s\n", msg)
}
received ++
}
} else {
return
}
}
}
func main() {
dealer, _ := zmq4.NewSocket(zmq4.DEALER)
router1, _ := zmq4.NewSocket(zmq4.ROUTER)
router2, _ := zmq4.NewSocket(zmq4.ROUTER)
router1.Bind("tcp://0.0.0.0:6667")
router2.Bind("tcp://0.0.0.0:6668")
dealer.Connect("tcp://0.0.0.0:6667")
dealer.Connect("tcp://0.0.0.0:6668")
router1.SetSubscribe("")
router2.SetSubscribe("")
dealer.SetSubscribe("")
for i := 0; i < 10; i++ {
dealer.SendBytes([]byte("Hello World"), 0)
}
time.Sleep(300 * time.Millisecond)
count1 := receiveAll(router1)
count2 := receiveAll(router2)
fmt.Printf("Blue sky scenario: count1=%d count2=%d\n", count1, count2)
// Shut down a peer
router1.Close()
time.Sleep(300 * time.Millisecond)
for i := 0; i < 10; i++ {
dealer.SendBytes([]byte("Hello World"), 0)
}
time.Sleep(300 * time.Millisecond)
count := receiveAll(router2)
fmt.Printf("Peer 1 offline: count=%d\n", count)
}
What I need to happen is for DEALER
to ignore disconnected or failed peers. Is this possible ?
当然可以。需要调整默认(非活动)值,使用您在 :
中的用例特定设置
- a
.setsockopt(
ZMQ.IMMEDIATE
, 1 )
用于不缓冲对等方的消息实例,这似乎不是 "alive"
- a
.setsockopt(
ZMQ.HEARTBEAT_IVL
, <ms> )
用于发送心跳
- a
.setsockopt(
ZMQ.HEARTBEAT_TTL
, <ms> )
用于生存时间设置
- a
.setsockopt(
ZMQ.HEARTBEAT_TIMEOUT
, <ms>)
超时阈值
- a
.setsockopt(
ZMQ.HANDSHAKE_IVL
, <ms> )
用于管理(重新)建立超时。
有关详细信息,请检查您的语言绑定以及它实际使用的原生 API 版本。大多数这些设置自 native-API v 3.x 以来可用,最新的 native-API v 4.2.2 文档将帮助您调整值和配置策略。
我正在使用 ZeroMQ 的多重连接功能,将单个 DEALER
连接到 2 ROUTERS
:
+----> .Connect() --> ROUTER 1
/
DEALER ---+------> .Connect() --> ROUTER 2
在我的测试中,我通过 DEALER
发送了 10 条消息。我向每个 ROUTER
-s.
我的问题是,如果 ROUTER 1
由于某种原因消失了,DEALER
仍会继续为它排队消息,我认为假设 ROUTER 1
最终会到来背部。我在 ROUTER 2
.
我需要做的是 DEALER
忽略断开连接或失败的对等点。这可能吗?
我尝试设置 ZMQ_SNDHWM
和许多其他设置,但似乎没有任何效果。
我能看到的唯一选择是自己进行故障转移,使用单独的套接字、心跳和 ACK 数据包等。看起来 ZeroMQ 应该已经实现了这样一个基本模式。
编辑: 测试代码
package main
import (
"github.com/pebbe/zmq4"
"time"
"log"
"fmt"
)
func receiveAll(sok *zmq4.Socket) (received int) {
poller := zmq4.NewPoller()
poller.Add(sok, zmq4.POLLIN)
for {
sockets, err := poller.Poll(100 * time.Millisecond)
if err != nil {
log.Print(err)
}
if len(sockets) > 0 {
for _, s := range sockets {
msg, _ := s.Socket.RecvMessageBytes(0)
if string(msg[1]) != "Hello World" {
log.Fatalf("Unexpected message: %s\n", msg)
}
received ++
}
} else {
return
}
}
}
func main() {
dealer, _ := zmq4.NewSocket(zmq4.DEALER)
router1, _ := zmq4.NewSocket(zmq4.ROUTER)
router2, _ := zmq4.NewSocket(zmq4.ROUTER)
router1.Bind("tcp://0.0.0.0:6667")
router2.Bind("tcp://0.0.0.0:6668")
dealer.Connect("tcp://0.0.0.0:6667")
dealer.Connect("tcp://0.0.0.0:6668")
router1.SetSubscribe("")
router2.SetSubscribe("")
dealer.SetSubscribe("")
for i := 0; i < 10; i++ {
dealer.SendBytes([]byte("Hello World"), 0)
}
time.Sleep(300 * time.Millisecond)
count1 := receiveAll(router1)
count2 := receiveAll(router2)
fmt.Printf("Blue sky scenario: count1=%d count2=%d\n", count1, count2)
// Shut down a peer
router1.Close()
time.Sleep(300 * time.Millisecond)
for i := 0; i < 10; i++ {
dealer.SendBytes([]byte("Hello World"), 0)
}
time.Sleep(300 * time.Millisecond)
count := receiveAll(router2)
fmt.Printf("Peer 1 offline: count=%d\n", count)
}
What I need to happen is for
DEALER
to ignore disconnected or failed peers. Is this possible ?
当然可以。需要调整默认(非活动)值,使用您在 :
中的用例特定设置- a
.setsockopt(
ZMQ.IMMEDIATE
, 1 )
用于不缓冲对等方的消息实例,这似乎不是 "alive" - a
.setsockopt(
ZMQ.HEARTBEAT_IVL
, <ms> )
用于发送心跳 - a
.setsockopt(
ZMQ.HEARTBEAT_TTL
, <ms> )
用于生存时间设置 - a
.setsockopt(
ZMQ.HEARTBEAT_TIMEOUT
, <ms>)
超时阈值 - a
.setsockopt(
ZMQ.HANDSHAKE_IVL
, <ms> )
用于管理(重新)建立超时。
有关详细信息,请检查您的语言绑定以及它实际使用的原生 API 版本。大多数这些设置自 native-API v 3.x 以来可用,最新的 native-API v 4.2.2 文档将帮助您调整值和配置策略。