在 Fargate 中使用 ZeroMQ Golang
Using ZeroMQ Golang within Fargate
我正尝试在 Fargate 上以 awsvpc 模式在 ECS 运行 中使用 ZeroMQ。我有 2 个不同的服务,每个 运行 都有自己的任务并启用了服务发现。
我在一个名为 broker 的微服务中创建了我的路由器和经销商。
front, _ := zmq.NewSocket(zmq.ROUTER)
defer front.Close()
front.Bind("tcp://*:4070")
back, _ := zmq.NewSocket(zmq.DEALER)
defer back.Close()
back.Bind("tcp://*:4080")
然后我将这 2 个套接字添加到轮询器并有一个等待消息的 for 循环。
我有一个单独的微服务连接到套接字并尝试向经销商发送消息。我已经设置了服务发现,所以我假设我连接的地址是:
"tcp://broker:4070"
下面是来自'serviceA'
的代码
func New(ZMQ models.ZMQ) *Requester {
s, err := zmq.NewSocket(zmq.REQ)
if err != nil {
log.Fatalln("shareholder/requester zmq.NewSocket", err)
}
p := zmq.NewPoller()
p.Add(s, zmq.POLLIN)
log.Println("Requester", ZMQ.Req)
err = s.Connect("tcp://broker:4070")
if err != nil {
log.Print(fmt.Errorf("err is %w", err))
}
req := &Requester{
Poller: p,
Retries: 2,
Socket: s,
Timeout: time.Duration(time.Minute),
}
runtime.SetFinalizer(req, (*Requester).Close)
return req
}
然后我使用上面的代码通过我的套接字连接发送消息
_, err := r.Socket.SendMessage(req)
但是,我的消息从未在我的代理服务中收到。我可以使用我在服务发现期间注册的主机名在网络上访问我的 REST API,我在这里缺少什么 Fargate/ECS/ZeroMQ???
我会描述我的想法作为答案,我相信我们可以解决您的问题。
所以我认为这是你的设置。
服务 A
nginx
通过 backend:9000
调用 backend
backend
通过 nginx:80
调用 nginx
服务 A 到 B
nginx
无法 通过 broker1:4070
调用 broker1
nginx
和 backend
都不能仅通过指定 name:port 来调用 broker1
或 broker2
。
如果容器 运行 在不同的服务中并且每个服务都有自己的 awsvpc
你不能仅仅通过指定 name:port 来调用它们。
您需要一个连接跨服务,从 A 到 B,这意味着您需要适当的服务发现。
Q : "is there something I am missing here with Fargate/ECS/ZeroMQ???"
也许是,也许不是。
让我们以结构化的方式开始,深入了解根本原因:
第 0 步:Broker 服务节点
提到要使用ZeroMQ,那我们就从这里开始吧。鉴于您的选择是在地址 ( *:4070 )
上使用到 DEALER
的访问点和在地址 ( *:4080 )
上使用到 ROUTER
的访问点,并且两者都使用 .bind()
方法在 Broker-微服务节点中激活 tcp://
-Transport Class,我们的下一步是验证,是否和这个节点实际上 对世界其他地方可见。
所以,让它 运行。
第 1 步:视线测试
这是测试的第一步 - 是 Broker-Node,无论它的实现是什么,实际上对于“预期的”是可见的观众” ?如果没有,在ZeroMQ或者其他框架内部没有太多的事情要做,但是你的任务是获取地址,L1-信号互连,L2-arp/rarp MAC-detection/mapping,L3-路由permissions/access-lists/filters/xlations/etc ,(动态)DNS 更新和所有其他配置已更新,以便您能够让世界其他地方的(选择性部分)看到并离成功更近一步 .connect()
$ # is it L3-(in)-visible # a [ PASS ] | [ FAIL ]
$ ping <_a_Broker_Node_Assumed_TCP/IP_Address> # a [ PASS ] | [ FAIL ]
第 2 步:端口号 RTO 测试
$ # 4070 # a [ PASS ] | [ FAIL ]
$ netcat -vz <_a_Broker_Node_visible_TCP/IP_Address> 4070 # a [ PASS ] | [ FAIL ]
$ ######
$ # OR :
$ ######
$ telnet <_a_Broker_Node_visible_TCP/IP_Address> 4070 # a [ PASS ] | [ FAIL ]
Trying
Connected to
Escape character is '^]'.
https://<_a_Broker_Node_visible_TCP/IP_Address>:4070
HTTP/1.1 400 Bad Request
Server: nginx
Date: Mon, 03 May 2020 18:14:54 GMT
Content-Type: text/html
Content-Length: 150
Connection: close
<html>
<head><title>400 Bad Request</title></head>
<body>
<center><h1>400 Bad Request</h1></center>
<hr><center>nginx</center>
</body>
</html>
Connection closed by foreign host.
$
$ // 4080 // a [ PASS ] | [ FAIL ]
$ telnet <_a_Broker_Node_visible_TCP/IP_Address> 4080 // a [ PASS ] | [ FAIL ]
第 3 步:本地消息发送的 RTO 测试
替换相当复杂的 REQ/ROUTER
-Scalable Formal Communications Archetype Pattern 领域,让我们用一个简单的 PUSH/PULL
-消息传递测试来测试,这(出于显而易见的原因)符合发送消息的预期用途:
package main
import (
zmq "github.com/pebbe/zmq4"
"log"
"fmt"
"time"
...
)
func PushTASK() {
aCtx, err := zmq.NewContext()
if err != nil {
log.Fatalln( "__NACK: aCtx instantiation failed in zmq.NewContext()",
err )
}
aPusher, err := aCtx.NewSocket( zmq.PUSH )
if err != nil {
log.Fatalln( "__NACK: aPusher instantiation failed in aCtxNewSocket()",
err )
}
err = aPusher.SetLinger( 0 )
if err != nil {
log.Fatalln( "__NACK: aPusher instance failed to .SetLinger()",
err )
}
err = aPusher.SetConflate( true )
if err != nil {
log.Fatalln( "__NACK: aPusher instance failed to .SetConflate()",
err )
}
log.Println( "POSACK: aPusher instantiated and about to .connect( tcp://addr:port#)" )
err = aPusher.Connect( "tcp://broker:4070" )
if err != nil {
log.Print( fmt.Errorf( "__NACK: aPusher failed to .connect(): %w",
err )
)
}
log.Println( "POSACK: aPusher RTO and about to .SendMessage*()-loop" )
for aPush_NUMBER := 1; aPush_NUMBER < 10000; aPush_NUMBER++ {
err = aPusher.SendMessageDontwait( aPush_NUMBER )
if err != nil {
log.Print( fmt.Errorf( "__NACK: aPusher failed to .SendMessageDontwait()[%d]: %w",
aPush_NUMBER,
err )
)
}
time.Sleep( 0.1 * time.Second )
}
// ---------------------------------------------------BE NICE TO RESOURCES USED
err = aPusher.Disconnect( "tcp://broker:4070" )
if err != nil {
log.Print( fmt.Errorf( "__NACK: aPusher failed to .Disconnect( tcp://addr:port ): %w",
err )
)
}
// ---------------------------------------------------BE NICE TO RESOURCES USED
err = aPusher.Close()
if err != nil {
log.Print( fmt.Errorf( "__NACK: aPusher failed to .Close(): %w",
err )
)
}
// ---------------------------------------------------BE NICE TO RESOURCES USED
err = aCtx.Term()
if err != nil {
log.Print( fmt.Errorf( "__NACK: aCtx failed to .Term(): %w",
err )
)
}
// ---------------------------------------------------WE ARE CLEAR TO TERMINATE
}
第 4 步:远程消息接收的 RTO 测试
如果 [ PASS ] | [ FAIL ]
-测试的 none 崩溃了,下一步是反映“远程”PUSH
-端的概念 Broker,是的,重写它以使用 PULL
端并部署它以查看是否也没有崩溃以及消息是否按预期到达还是要在运行ning 或重新运行 Step 3.
第 5 步:享受 ZeroMQ 的强大功能
一旦上述所有测试确实执行 [ PASS ]
,您不仅可以确定 ZeroMQ 不是阻碍,而且可以将部署的原则增强到任何进一步的用例场景中,因为 L1-/L2-/L3-/ZeroMQ-services 以正确且可验证的方式实施。
我正尝试在 Fargate 上以 awsvpc 模式在 ECS 运行 中使用 ZeroMQ。我有 2 个不同的服务,每个 运行 都有自己的任务并启用了服务发现。
我在一个名为 broker 的微服务中创建了我的路由器和经销商。
front, _ := zmq.NewSocket(zmq.ROUTER)
defer front.Close()
front.Bind("tcp://*:4070")
back, _ := zmq.NewSocket(zmq.DEALER)
defer back.Close()
back.Bind("tcp://*:4080")
然后我将这 2 个套接字添加到轮询器并有一个等待消息的 for 循环。
我有一个单独的微服务连接到套接字并尝试向经销商发送消息。我已经设置了服务发现,所以我假设我连接的地址是:
"tcp://broker:4070"
下面是来自'serviceA'
的代码func New(ZMQ models.ZMQ) *Requester {
s, err := zmq.NewSocket(zmq.REQ)
if err != nil {
log.Fatalln("shareholder/requester zmq.NewSocket", err)
}
p := zmq.NewPoller()
p.Add(s, zmq.POLLIN)
log.Println("Requester", ZMQ.Req)
err = s.Connect("tcp://broker:4070")
if err != nil {
log.Print(fmt.Errorf("err is %w", err))
}
req := &Requester{
Poller: p,
Retries: 2,
Socket: s,
Timeout: time.Duration(time.Minute),
}
runtime.SetFinalizer(req, (*Requester).Close)
return req
}
然后我使用上面的代码通过我的套接字连接发送消息
_, err := r.Socket.SendMessage(req)
但是,我的消息从未在我的代理服务中收到。我可以使用我在服务发现期间注册的主机名在网络上访问我的 REST API,我在这里缺少什么 Fargate/ECS/ZeroMQ???
我会描述我的想法作为答案,我相信我们可以解决您的问题。
所以我认为这是你的设置。
服务 A
nginx
通过backend:9000
调用 backend
通过nginx:80
调用
backend
nginx
服务 A 到 B
nginx
无法 通过broker1:4070
调用 broker1
nginx
和backend
都不能仅通过指定 name:port 来调用broker1
或broker2
。
如果容器 运行 在不同的服务中并且每个服务都有自己的 awsvpc
你不能仅仅通过指定 name:port 来调用它们。
您需要一个连接跨服务,从 A 到 B,这意味着您需要适当的服务发现。
Q : "is there something I am missing here with Fargate/ECS/ZeroMQ???"
也许是,也许不是。
让我们以结构化的方式开始,深入了解根本原因:
第 0 步:Broker 服务节点
提到要使用ZeroMQ,那我们就从这里开始吧。鉴于您的选择是在地址 ( *:4070 )
上使用到 DEALER
的访问点和在地址 ( *:4080 )
上使用到 ROUTER
的访问点,并且两者都使用 .bind()
方法在 Broker-微服务节点中激活 tcp://
-Transport Class,我们的下一步是验证,是否和这个节点实际上 对世界其他地方可见。
所以,让它 运行。
第 1 步:视线测试
这是测试的第一步 - 是 Broker-Node,无论它的实现是什么,实际上对于“预期的”是可见的观众” ?如果没有,在ZeroMQ或者其他框架内部没有太多的事情要做,但是你的任务是获取地址,L1-信号互连,L2-arp/rarp MAC-detection/mapping,L3-路由permissions/access-lists/filters/xlations/etc ,(动态)DNS 更新和所有其他配置已更新,以便您能够让世界其他地方的(选择性部分)看到并离成功更近一步 .connect()
$ # is it L3-(in)-visible # a [ PASS ] | [ FAIL ]
$ ping <_a_Broker_Node_Assumed_TCP/IP_Address> # a [ PASS ] | [ FAIL ]
第 2 步:端口号 RTO 测试
$ # 4070 # a [ PASS ] | [ FAIL ]
$ netcat -vz <_a_Broker_Node_visible_TCP/IP_Address> 4070 # a [ PASS ] | [ FAIL ]
$ ######
$ # OR :
$ ######
$ telnet <_a_Broker_Node_visible_TCP/IP_Address> 4070 # a [ PASS ] | [ FAIL ]
Trying
Connected to
Escape character is '^]'.
https://<_a_Broker_Node_visible_TCP/IP_Address>:4070
HTTP/1.1 400 Bad Request
Server: nginx
Date: Mon, 03 May 2020 18:14:54 GMT
Content-Type: text/html
Content-Length: 150
Connection: close
<html>
<head><title>400 Bad Request</title></head>
<body>
<center><h1>400 Bad Request</h1></center>
<hr><center>nginx</center>
</body>
</html>
Connection closed by foreign host.
$
$ // 4080 // a [ PASS ] | [ FAIL ]
$ telnet <_a_Broker_Node_visible_TCP/IP_Address> 4080 // a [ PASS ] | [ FAIL ]
第 3 步:本地消息发送的 RTO 测试
替换相当复杂的 REQ/ROUTER
-Scalable Formal Communications Archetype Pattern 领域,让我们用一个简单的 PUSH/PULL
-消息传递测试来测试,这(出于显而易见的原因)符合发送消息的预期用途:
package main
import (
zmq "github.com/pebbe/zmq4"
"log"
"fmt"
"time"
...
)
func PushTASK() {
aCtx, err := zmq.NewContext()
if err != nil {
log.Fatalln( "__NACK: aCtx instantiation failed in zmq.NewContext()",
err )
}
aPusher, err := aCtx.NewSocket( zmq.PUSH )
if err != nil {
log.Fatalln( "__NACK: aPusher instantiation failed in aCtxNewSocket()",
err )
}
err = aPusher.SetLinger( 0 )
if err != nil {
log.Fatalln( "__NACK: aPusher instance failed to .SetLinger()",
err )
}
err = aPusher.SetConflate( true )
if err != nil {
log.Fatalln( "__NACK: aPusher instance failed to .SetConflate()",
err )
}
log.Println( "POSACK: aPusher instantiated and about to .connect( tcp://addr:port#)" )
err = aPusher.Connect( "tcp://broker:4070" )
if err != nil {
log.Print( fmt.Errorf( "__NACK: aPusher failed to .connect(): %w",
err )
)
}
log.Println( "POSACK: aPusher RTO and about to .SendMessage*()-loop" )
for aPush_NUMBER := 1; aPush_NUMBER < 10000; aPush_NUMBER++ {
err = aPusher.SendMessageDontwait( aPush_NUMBER )
if err != nil {
log.Print( fmt.Errorf( "__NACK: aPusher failed to .SendMessageDontwait()[%d]: %w",
aPush_NUMBER,
err )
)
}
time.Sleep( 0.1 * time.Second )
}
// ---------------------------------------------------BE NICE TO RESOURCES USED
err = aPusher.Disconnect( "tcp://broker:4070" )
if err != nil {
log.Print( fmt.Errorf( "__NACK: aPusher failed to .Disconnect( tcp://addr:port ): %w",
err )
)
}
// ---------------------------------------------------BE NICE TO RESOURCES USED
err = aPusher.Close()
if err != nil {
log.Print( fmt.Errorf( "__NACK: aPusher failed to .Close(): %w",
err )
)
}
// ---------------------------------------------------BE NICE TO RESOURCES USED
err = aCtx.Term()
if err != nil {
log.Print( fmt.Errorf( "__NACK: aCtx failed to .Term(): %w",
err )
)
}
// ---------------------------------------------------WE ARE CLEAR TO TERMINATE
}
第 4 步:远程消息接收的 RTO 测试
如果 [ PASS ] | [ FAIL ]
-测试的 none 崩溃了,下一步是反映“远程”PUSH
-端的概念 Broker,是的,重写它以使用 PULL
端并部署它以查看是否也没有崩溃以及消息是否按预期到达还是要在运行ning 或重新运行 Step 3.
第 5 步:享受 ZeroMQ 的强大功能
一旦上述所有测试确实执行 [ PASS ]
,您不仅可以确定 ZeroMQ 不是阻碍,而且可以将部署的原则增强到任何进一步的用例场景中,因为 L1-/L2-/L3-/ZeroMQ-services 以正确且可验证的方式实施。