mangos的问题——Golang包提供的nanomsg总线协议
Problems with mangos - the nanomsg bus protocol provided by Golang package
我想用nanomsg
/nng
作为全分布式点对点多节点网络的通信基础,帮助构建拓扑发现的动态能力和维护。现在我陷入了它的 Golang 包 mangos
.
Python 和 pynng(它是 nanomsg 的 python 绑定)也做了同样的工作,但是当我使用 Go 并通过 mangos 调用相应的方法时,它们的行为是完全不同。谜题主要有三层:
- bus-type-Socket 的Recv() 默认以阻塞模式运行,似乎不可配置为非阻塞模式。文件说:
OptionRecvDeadline is the time until the next Recv times out. The value is a time.Duration. Zero value may be passed to indicate that no timeout should be applied. A negative value indicates a non-blocking operation. By default there is no timeout.
我相应地尝试了 负值,但 Recv()
仍然阻塞。我还应该做什么?以及如何理解“零超时”和“非阻塞”的区别?
(s *socket) NewDialer(...)
返回的 dialer
似乎在调用 dialer.Close()
后仍然存在,因为调用下一个 dialer.Dial()
时会发生错误,报告它仍然是“地址在利用”。但是当我再次尝试 Close()
和 dialer
时,错误发生并报告它已经关闭。我还尝试了以下选项的不同组合,但所有尝试都失败了
opts := make(map[string]interface{})
opts[mangos.OptionDialAsynch] = true // or false
opts[mangos.OptionMaxReconnectTime] = time.Millisecond // or zero
opts[mangos.OptionKeepAliveTime] = time.Millisecond // or even smaller
opts[mangos.OptionKeepAlive] = false // or true
想彻底杀掉拨号器,或者过段时间想重用“伪关闭”拨号器怎么办?
- bus-type-Socket 的
Send()
很奇怪。通常每个节点都应该在我的代码中定期发送消息。我关闭了一个节点(比如“Node-X”)与网络的物理连接,让它离线一段时间,然后重新连接到网络。我发现 Node-X 在重新连接时会立即重新发送大量消息。但我真正期望的是,即使没有邻居,Node-X 也可以将这些消息发送到空中。
不知有什么办法可以克服这些问题。我猜它可能缺少一些选项或配置,但我没有弄明白。
以下代码用于再现重拨和重关闭错误。
package main
import (
"fmt"
"os"
"time"
"go.nanomsg.org/mangos/v3"
"go.nanomsg.org/mangos/v3/protocol/bus"
// register transports
_ "go.nanomsg.org/mangos/v3/transport/all"
)
var (
sock mangos.Socket
DialerMap map[string]*mangos.Dialer
opts map[string]interface{}
)
func main() {
var err error
opts = make(map[string]interface{})
opts[mangos.OptionDialAsynch] = true
opts[mangos.OptionMaxReconnectTime] = time.Millisecond
// opts[mangos.OptionKeepAliveTime] = time.Millisecond
opts[mangos.OptionKeepAlive] = false
DialerMap = make(map[string]*mangos.Dialer)
if sock, err = bus.NewSocket(); err != nil {
fmt.Println("bus.NewSocket error. ", err)
os.Exit(1)
}
TargetUUID := "node-A"
TargetAddr := "tcp://192.168.0.172:60000" // this should be changed to a available address
MyDial(TargetUUID, TargetAddr)
time.Sleep(time.Second * 2)
MyClose(TargetUUID, TargetAddr)
time.Sleep(time.Second * 2)
MyDial(TargetUUID, TargetAddr)
time.Sleep(time.Second * 2)
MyClose(TargetUUID, TargetAddr)
time.Sleep(100 * time.Second)
}
func MyDial(TargetUUID string, TargetAddr string) (mangos.Dialer, error) {
_, is_exist := DialerMap[TargetUUID]
var err error
var dialer mangos.Dialer
if !is_exist {
dialer, err = sock.NewDialer(TargetAddr, opts)
if err != nil {
} else {
DialerMap[TargetUUID] = &dialer
}
}
dialer = *DialerMap[TargetUUID]
err = dialer.Dial()
if err != nil {
fmt.Println("Dialer fails to dial()", err)
} else {
fmt.Println("Dialer succeeds to dial()")
}
return dialer, err
}
func MyClose(TargetUUID string, TargetAddr string) {
dialerAddr, is_exist := DialerMap[TargetUUID]
if !is_exist {
fmt.Println("Dialer does not exist")
}
dialer := *dialerAddr
err := dialer.Close()
if err != nil {
fmt.Println("dialer fails to close.", err)
} else {
fmt.Println("dialer succeeds to close")
}
}
控制台输出为
Dialer succeeds to dial()
dialer succeeds to close
Dialer fails to dial() address in use
dialer fails to close. object closed
我通常不会关注 Whosebug 或 reddit 上的此类问题——我们有一个 Discord 频道(link 来自 mangos 和 NNG 主页),以及一个邮件列表。
话虽如此,让我看看是否可以提供帮助(我是NNG和芒果的作者):
- 总线支持 OptionRecvDeadline。但是,您是正确的,它不支持具有负值的非阻塞模式,相反,负值被视为与零相同,并且充当阻塞。这是一个文档错误。要实现逻辑非阻塞,请使用值“1”,这意味着一纳秒,逻辑上等同于非阻塞,尽管粒度 可能 受调度程序延迟限制. (在这种情况下,就像执行“关闭(通道);<-通道”——非常接近非阻塞。
我会考虑修复文档。
在拨号器上调用 Close() 是正确的做法。它会一直持续到管道关闭,它会自动关闭。您使用 非常 很短的重拨时间可能会混淆这一点——老实说,我没有考虑过短的重拨时间——通常这是不好的形式这是因为这意味着如果对等方不可用,您的代码将在处理器上努力尝试重新连接。我通常建议 至少 10 毫秒的重试间隔上限。 (mangos.OptionMaxReconnectTime)
我认为您看到了排队的影响,但我不是 100% 确定——我需要看到一个重现它的测试用例。毫无疑问,总线协议是尽力而为的传递,如果没有连接的对等点,则消息将被丢弃在地板上。 (只是重新检查以确定。)
感谢@Garrett D'Amore
的回复,我现在可以用另一种方式解决我的问题,我(作为一个对底层通信层知之甚少的Golang新手) 很抱歉用这么初级和愚蠢的问题来打扰你。
问题(1)作者回答的很好
问题(3)可能与问题(2)相结合,因为作者给出了如下机制,从而消除了发送缓冲累积的可能性。
Definitely the bus protocol is best effort delivery, and if there are no connected peers then the message is dropped on the floor. (Just rechecked that to be certain.)
问题(2),我第一次尝试将mangos.OptionMaxReconnectTime
设置为100 ms
,但问题依旧存在。第二次,我尝试了各种options
组合来配置socket和拨号器,但都失败了。
最后,由于作者指出
Calling Close() on the dialer is the right thing to do. It will linger until the pipes are closed, which it does automatically. It is possible that your use of a very short redial time might confound this.
我转向另一种关闭旧拨号器的方法,即明确关闭它拥有的所有管道。为此,可以定义一个回调处理程序,例如
var pipe_c chan
func callbackHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
pAddr := &pipe
pipe_c <- pAddr
}
然后将回调处理程序附加到套接字
sock.SetPipeEventHook(callbackHandler)
通过这样做,(private var) 管道可以被用户获取。当一个人想关闭拨号连接时,他或她可以这样做
dialer.Close() // try best to close a dialer automatically
for pAddr, num := range pipeSet {
(*pAddr).Close() // explicitly close all the pipes of the dialer
}
并且不要管“伪封闭”拨号器。当想再次连接到远程地址时,可以创建并使用一个新的拨号器。
我不知道旧的“伪封闭”拨号器是否会在内存中累积。但这已经是我能找到的唯一解决方案了。
我想用nanomsg
/nng
作为全分布式点对点多节点网络的通信基础,帮助构建拓扑发现的动态能力和维护。现在我陷入了它的 Golang 包 mangos
.
Python 和 pynng(它是 nanomsg 的 python 绑定)也做了同样的工作,但是当我使用 Go 并通过 mangos 调用相应的方法时,它们的行为是完全不同。谜题主要有三层:
- bus-type-Socket 的Recv() 默认以阻塞模式运行,似乎不可配置为非阻塞模式。文件说:
OptionRecvDeadline is the time until the next Recv times out. The value is a time.Duration. Zero value may be passed to indicate that no timeout should be applied. A negative value indicates a non-blocking operation. By default there is no timeout.
我相应地尝试了 负值,但 Recv()
仍然阻塞。我还应该做什么?以及如何理解“零超时”和“非阻塞”的区别?
(s *socket) NewDialer(...)
返回的dialer
似乎在调用dialer.Close()
后仍然存在,因为调用下一个dialer.Dial()
时会发生错误,报告它仍然是“地址在利用”。但是当我再次尝试Close()
和dialer
时,错误发生并报告它已经关闭。我还尝试了以下选项的不同组合,但所有尝试都失败了
opts := make(map[string]interface{})
opts[mangos.OptionDialAsynch] = true // or false
opts[mangos.OptionMaxReconnectTime] = time.Millisecond // or zero
opts[mangos.OptionKeepAliveTime] = time.Millisecond // or even smaller
opts[mangos.OptionKeepAlive] = false // or true
想彻底杀掉拨号器,或者过段时间想重用“伪关闭”拨号器怎么办?
- bus-type-Socket 的
Send()
很奇怪。通常每个节点都应该在我的代码中定期发送消息。我关闭了一个节点(比如“Node-X”)与网络的物理连接,让它离线一段时间,然后重新连接到网络。我发现 Node-X 在重新连接时会立即重新发送大量消息。但我真正期望的是,即使没有邻居,Node-X 也可以将这些消息发送到空中。
不知有什么办法可以克服这些问题。我猜它可能缺少一些选项或配置,但我没有弄明白。
以下代码用于再现重拨和重关闭错误。
package main
import (
"fmt"
"os"
"time"
"go.nanomsg.org/mangos/v3"
"go.nanomsg.org/mangos/v3/protocol/bus"
// register transports
_ "go.nanomsg.org/mangos/v3/transport/all"
)
var (
sock mangos.Socket
DialerMap map[string]*mangos.Dialer
opts map[string]interface{}
)
func main() {
var err error
opts = make(map[string]interface{})
opts[mangos.OptionDialAsynch] = true
opts[mangos.OptionMaxReconnectTime] = time.Millisecond
// opts[mangos.OptionKeepAliveTime] = time.Millisecond
opts[mangos.OptionKeepAlive] = false
DialerMap = make(map[string]*mangos.Dialer)
if sock, err = bus.NewSocket(); err != nil {
fmt.Println("bus.NewSocket error. ", err)
os.Exit(1)
}
TargetUUID := "node-A"
TargetAddr := "tcp://192.168.0.172:60000" // this should be changed to a available address
MyDial(TargetUUID, TargetAddr)
time.Sleep(time.Second * 2)
MyClose(TargetUUID, TargetAddr)
time.Sleep(time.Second * 2)
MyDial(TargetUUID, TargetAddr)
time.Sleep(time.Second * 2)
MyClose(TargetUUID, TargetAddr)
time.Sleep(100 * time.Second)
}
func MyDial(TargetUUID string, TargetAddr string) (mangos.Dialer, error) {
_, is_exist := DialerMap[TargetUUID]
var err error
var dialer mangos.Dialer
if !is_exist {
dialer, err = sock.NewDialer(TargetAddr, opts)
if err != nil {
} else {
DialerMap[TargetUUID] = &dialer
}
}
dialer = *DialerMap[TargetUUID]
err = dialer.Dial()
if err != nil {
fmt.Println("Dialer fails to dial()", err)
} else {
fmt.Println("Dialer succeeds to dial()")
}
return dialer, err
}
func MyClose(TargetUUID string, TargetAddr string) {
dialerAddr, is_exist := DialerMap[TargetUUID]
if !is_exist {
fmt.Println("Dialer does not exist")
}
dialer := *dialerAddr
err := dialer.Close()
if err != nil {
fmt.Println("dialer fails to close.", err)
} else {
fmt.Println("dialer succeeds to close")
}
}
控制台输出为
Dialer succeeds to dial()
dialer succeeds to close
Dialer fails to dial() address in use
dialer fails to close. object closed
我通常不会关注 Whosebug 或 reddit 上的此类问题——我们有一个 Discord 频道(link 来自 mangos 和 NNG 主页),以及一个邮件列表。
话虽如此,让我看看是否可以提供帮助(我是NNG和芒果的作者):
- 总线支持 OptionRecvDeadline。但是,您是正确的,它不支持具有负值的非阻塞模式,相反,负值被视为与零相同,并且充当阻塞。这是一个文档错误。要实现逻辑非阻塞,请使用值“1”,这意味着一纳秒,逻辑上等同于非阻塞,尽管粒度 可能 受调度程序延迟限制. (在这种情况下,就像执行“关闭(通道);<-通道”——非常接近非阻塞。
我会考虑修复文档。
在拨号器上调用 Close() 是正确的做法。它会一直持续到管道关闭,它会自动关闭。您使用 非常 很短的重拨时间可能会混淆这一点——老实说,我没有考虑过短的重拨时间——通常这是不好的形式这是因为这意味着如果对等方不可用,您的代码将在处理器上努力尝试重新连接。我通常建议 至少 10 毫秒的重试间隔上限。 (mangos.OptionMaxReconnectTime)
我认为您看到了排队的影响,但我不是 100% 确定——我需要看到一个重现它的测试用例。毫无疑问,总线协议是尽力而为的传递,如果没有连接的对等点,则消息将被丢弃在地板上。 (只是重新检查以确定。)
感谢@Garrett D'Amore
的回复,我现在可以用另一种方式解决我的问题,我(作为一个对底层通信层知之甚少的Golang新手) 很抱歉用这么初级和愚蠢的问题来打扰你。
问题(1)作者回答的很好
问题(3)可能与问题(2)相结合,因为作者给出了如下机制,从而消除了发送缓冲累积的可能性。
Definitely the bus protocol is best effort delivery, and if there are no connected peers then the message is dropped on the floor. (Just rechecked that to be certain.)
问题(2),我第一次尝试将mangos.OptionMaxReconnectTime
设置为100 ms
,但问题依旧存在。第二次,我尝试了各种options
组合来配置socket和拨号器,但都失败了。
最后,由于作者指出
Calling Close() on the dialer is the right thing to do. It will linger until the pipes are closed, which it does automatically. It is possible that your use of a very short redial time might confound this.
我转向另一种关闭旧拨号器的方法,即明确关闭它拥有的所有管道。为此,可以定义一个回调处理程序,例如
var pipe_c chan
func callbackHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
pAddr := &pipe
pipe_c <- pAddr
}
然后将回调处理程序附加到套接字
sock.SetPipeEventHook(callbackHandler)
通过这样做,(private var) 管道可以被用户获取。当一个人想关闭拨号连接时,他或她可以这样做
dialer.Close() // try best to close a dialer automatically
for pAddr, num := range pipeSet {
(*pAddr).Close() // explicitly close all the pipes of the dialer
}
并且不要管“伪封闭”拨号器。当想再次连接到远程地址时,可以创建并使用一个新的拨号器。
我不知道旧的“伪封闭”拨号器是否会在内存中累积。但这已经是我能找到的唯一解决方案了。