具有 XSub/Xpub 模式中控制套接字使用示例的 NetMQ 代理?
NetMQ Proxy with control socket usage example in XSub/Xpub pattern?
我正在尝试使用 NetMQ + 控制套接字(以控制集线器的行为)实现具有 XPUB/XSUB 模式的集线器。我使用代理,NetMqPoller,并且想使用控制套接字。但无论我尝试什么 - 它都不起作用。这是我的代码,为什么它不起作用?
Using xpubSocket As New XPublisherSocket("@tcp://127.0.0.1:1234")
Using xsubSocket As New XSubscriberSocket("@tcp://127.0.0.1:5678")
Using plr As New NetMQPoller()
Using ctrlIn As New StreamSocket(">tcp://127.0.0.1:5678")
AddHandler ctrlIn.ReceiveReady, AddressOf ctrlIn_ReceiveReady
plr.Add(xpubSocket)
plr.Add(xsubSocket)
plr.Add(ctrlIn)
Dim proxy As New Proxy(xsubSocket, xpubSocket, ctrlIn, plr)
proxy.Start()
plr.Run()
End Using
End Using
End Using
End Using
还有一个每次ctrlIn套接字接收数据时运行的方法:
Sub ctrlIn_ReceiveReady(sender As Object, e As NetMQSocketEventArgs)
Dim bytes() As Byte
While (e.Socket.TryReceiveFrameBytes(bytes))
Console.WriteLine("Received {0} bytes.", bytes.Length)
End While
End Sub
现在简短说明:集线器 (XPUB/XSUB) 工作完美,即当我启动发布者和订阅者时 - 我可以看到消息流动。但是控制套接字不起作用,我得到的只是其中的两条消息:
Received 5 bytes.
Received 10 bytes.
之后 - 控制套接字保持沉默,不再有字节流过它。
有谁知道我错在哪里?或者也许任何人都可以指出一个有效的例子?我一直在寻找一个示例,但找不到任何可以正常工作的控制套接字。
为什么control socket是Stream类型的?另外你应该在控制套接字的另一端有另一个套接字,现在你正在将流类型的控制套接字连接到发布者,流和发布者不能相互通信。
尝试这样的事情(抱歉语法错误,不是 VB 开发者)
Using xpubSocket As New XPublisherSocket("@tcp://127.0.0.1:1234")
Using xsubSocket As New XSubscriberSocket("@tcp://127.0.0.1:5678")
Using plr As New NetMQPoller()
Using ctrlOut As New Dealer("@inproc://control")
Using ctrlIn As New Dealer(">inproc://control")
AddHandler ctrlIn.ReceiveReady, AddressOf ctrlIn_ReceiveReady
plr.Add(xpubSocket)
plr.Add(xsubSocket)
plr.Add(ctrlIn)
Dim proxy As New Proxy(xsubSocket, xpubSocket, ctrlOut, plr)
proxy.Start()
plr.Run()
End Using
End Using
End Using
End Using
我正在尝试使用 NetMQ + 控制套接字(以控制集线器的行为)实现具有 XPUB/XSUB 模式的集线器。我使用代理,NetMqPoller,并且想使用控制套接字。但无论我尝试什么 - 它都不起作用。这是我的代码,为什么它不起作用?
Using xpubSocket As New XPublisherSocket("@tcp://127.0.0.1:1234")
Using xsubSocket As New XSubscriberSocket("@tcp://127.0.0.1:5678")
Using plr As New NetMQPoller()
Using ctrlIn As New StreamSocket(">tcp://127.0.0.1:5678")
AddHandler ctrlIn.ReceiveReady, AddressOf ctrlIn_ReceiveReady
plr.Add(xpubSocket)
plr.Add(xsubSocket)
plr.Add(ctrlIn)
Dim proxy As New Proxy(xsubSocket, xpubSocket, ctrlIn, plr)
proxy.Start()
plr.Run()
End Using
End Using
End Using
End Using
还有一个每次ctrlIn套接字接收数据时运行的方法:
Sub ctrlIn_ReceiveReady(sender As Object, e As NetMQSocketEventArgs)
Dim bytes() As Byte
While (e.Socket.TryReceiveFrameBytes(bytes))
Console.WriteLine("Received {0} bytes.", bytes.Length)
End While
End Sub
现在简短说明:集线器 (XPUB/XSUB) 工作完美,即当我启动发布者和订阅者时 - 我可以看到消息流动。但是控制套接字不起作用,我得到的只是其中的两条消息:
Received 5 bytes.
Received 10 bytes.
之后 - 控制套接字保持沉默,不再有字节流过它。 有谁知道我错在哪里?或者也许任何人都可以指出一个有效的例子?我一直在寻找一个示例,但找不到任何可以正常工作的控制套接字。
为什么control socket是Stream类型的?另外你应该在控制套接字的另一端有另一个套接字,现在你正在将流类型的控制套接字连接到发布者,流和发布者不能相互通信。
尝试这样的事情(抱歉语法错误,不是 VB 开发者)
Using xpubSocket As New XPublisherSocket("@tcp://127.0.0.1:1234")
Using xsubSocket As New XSubscriberSocket("@tcp://127.0.0.1:5678")
Using plr As New NetMQPoller()
Using ctrlOut As New Dealer("@inproc://control")
Using ctrlIn As New Dealer(">inproc://control")
AddHandler ctrlIn.ReceiveReady, AddressOf ctrlIn_ReceiveReady
plr.Add(xpubSocket)
plr.Add(xsubSocket)
plr.Add(ctrlIn)
Dim proxy As New Proxy(xsubSocket, xpubSocket, ctrlOut, plr)
proxy.Start()
plr.Run()
End Using
End Using
End Using
End Using