如何让 ZeroMQ 使出站排队但未在设定时间内发送的消息超时?
How to have ZeroMQ to timeout messages that are outbound queued, but haven't been sent within a set time?
我在服务器上有一个应用程序 运行,它从 phone 应用程序接收请求,然后在工作服务器之间对请求进行负载平衡。我正在尝试添加超时,以防主服务器上已在出站队列中超时长度的消息从队列中删除。更具体地说,主服务器上的应用程序是用golang编写的,并实现了Paranoid Pirate Pattern负载均衡。我目前的代码是:
import (
"fmt"
zmq "github.com/pebbe/zmq4"
"time"
)
const (
HEARTBEAT_LIVENESS = 3
HEARTBEAT_INTERVAL = 1500 * time.Millisecond
MESSAGE_READY = "[=10=]1"
MESSAGE_HEARTBEAT = "[=10=]2"
)
var (
client *zmq.Socket
backend *zmq.Socket
frontend *zmq.Socket
workerPoller *zmq.Poller
brokerPoller *zmq.Poller
workerQueue []Worker
)
type Worker struct {
Id string
Expire time.Time
}
type RequestWrapper {
RequestToSend Request
}
func NewWorker(id string) Worker {
return Worker{
Id: id,
Expire: time.Now().Add(HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS),
}
}
func AddReadyWorker(workers []Worker, worker Worker) []Worker {
fmt.Println(worker.Id, " joined")
for i, w := range workers {
if worker.Id == w.Id {
if i == 0 {
workers = workers[1:]
} else if i == len(workers)-1 {
workers = workers[:i]
} else {
workers = append(workers[:i], workers[i+1:]...)
}
break
}
}
return append(workers, worker)
}
func PurgeInactiveWorkers() {
now := time.Now()
for i, worker := range workerQueue {
if now.Before(worker.Expire) {
workerQueue = workerQueue[i:]
return
}
}
workerQueue = workerQueue[0:0]
}
func LoadBalance() {
// Loop:
heartbeat := time.Tick(HEARTBEAT_INTERVAL)
for {
var sockets []zmq.Polled
// If you have available workers, poll on the both front and backend
// If not poll on backend with infinite timeout
if len(workerQueue) > 0 {
sockets, _ = brokerPoller.Poll(HEARTBEAT_INTERVAL)
} else {
sockets, _ = workerPoller.Poll(HEARTBEAT_INTERVAL)
}
for _, socket := range sockets {
switch socket.Socket {
// backend is a router
case backend:
workerId, _ := backend.Recv(0)
workerQueue = AddReadyWorker(workerQueue, NewWorker(workerId))
clientId, _ := backend.Recv(0)
if clientId != MESSAGE_READY && clientId != MESSAGE_HEARTBEAT {
route, _ := backend.Recv(0)
message, _ := backend.RecvBytes(0)
fmt.Println("Received response")
RouteResponse(route, message)
// frontend.Send(clientId, zmq.SNDMORE)
// frontend.Send("", zmq.SNDMORE)
// frontend.SendBytes(message, 0)
}
// frontend is a dealer
case frontend:
clientId, _ := frontend.Recv(0)
route, _ := frontend.Recv(0)
message, _ := frontend.RecvBytes(0)
backend.Send(workerQueue[0].Id, zmq.SNDMORE)
backend.Send(clientId, zmq.SNDMORE)
backend.Send(route, zmq.SNDMORE)
backend.SendBytes(message, 0)
workerQueue = workerQueue[1:]
}
}
select {
case <-heartbeat:
for _, worker := range workerQueue {
backend.Send(worker.Id, zmq.SNDMORE)
backend.Send(MESSAGE_HEARTBEAT, 0)
}
break
default:
}
PurgeInactiveWorkers()
}
}
如果后端发送了一条消息,但在一段时间内没有真正发送给工作人员,我希望它过期并且永远不会发送。 是否有可以完成此操作的套接字选项?如果没有,我需要做什么才能做到这一点?
我认为可以在没有套接字选项的情况下执行此操作的两种方法是:
1) 让后端将消息包装在包装器中并发送到 golang 队列,而不是通过 zeromq。包装器包含消息 "sent" 的时间。后端同时从golang队列前端拉取一条消息,检查消息是否过期。如果是,请不要发送,如果不是,请发送消息。我可以让后端先将消息添加到 golang 队列,然后在同一代码块中真正将其发送出去。这样,我就不需要锁了。
2) 通过 zeromq 将包装器消息发送到检索器,检索器检查其是否过期并且 returns 提前。我不喜欢这个,因为它似乎对性能不利。
在较新的 API 版本中,有一个选项可以丢弃所有 "old" 消息并始终只传送 "newest" 一个。
如果这符合您的期望,并且如果所有同行都满足 API v.4.0+,那么您就完成了。
ZMQ_CONFLATE
: Keep only last message
If set, a socket shall keep only one message in its inbound/outbound queue, this message being the last message received/the last message to be sent. Ignores ZMQ_RCVHWM
and ZMQ_SNDHWM
options. Does not support multi-part messages, in particular, only one part of it is kept in the socket internal queue.
Option value type
int
Option value unit
boolean
Default value
0 (false)
Applicable socket types
ZMQ_PULL
, ZMQ_PUSH
, ZMQ_SUB
, ZMQ_PUB
, ZMQ_DEALER
您要做的是将通信用作执行集合点。发件人想知道收件人何时收到消息。
ZMQ实现了Actor模型。您需要修改通信顺序进程模型(发送超时的模型)。基本上你需要添加控制消息流 to/from 工人,想法是服务器要求工人接收消息,服务器等待回复。回复意味着工作人员已准备好立即接收消息,并且服务器和工作人员都在其程序流程中的 send/receive 会合。如果该回复未能在超时秒内到达,则服务器不会发送实际消息。
或者您可以通过将所有内容都交给工作人员来欺骗,包装在带有 "sent at time X" 字段的消息中,并让工作人员决定丢弃旧消息。
最后,解决方案是像@colini 和@bazza 提到的那样添加一个 expires-at 属性,并在每次心跳后从队列中删除超时消息。然而,这样做并满足我的应用程序的所有要求被证明比第一眼看起来更困难,所以我最终使用了 RabbitMQ,它的 ttl-expires 参数提供了所需的功能。
我在服务器上有一个应用程序 运行,它从 phone 应用程序接收请求,然后在工作服务器之间对请求进行负载平衡。我正在尝试添加超时,以防主服务器上已在出站队列中超时长度的消息从队列中删除。更具体地说,主服务器上的应用程序是用golang编写的,并实现了Paranoid Pirate Pattern负载均衡。我目前的代码是:
import (
"fmt"
zmq "github.com/pebbe/zmq4"
"time"
)
const (
HEARTBEAT_LIVENESS = 3
HEARTBEAT_INTERVAL = 1500 * time.Millisecond
MESSAGE_READY = "[=10=]1"
MESSAGE_HEARTBEAT = "[=10=]2"
)
var (
client *zmq.Socket
backend *zmq.Socket
frontend *zmq.Socket
workerPoller *zmq.Poller
brokerPoller *zmq.Poller
workerQueue []Worker
)
type Worker struct {
Id string
Expire time.Time
}
type RequestWrapper {
RequestToSend Request
}
func NewWorker(id string) Worker {
return Worker{
Id: id,
Expire: time.Now().Add(HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS),
}
}
func AddReadyWorker(workers []Worker, worker Worker) []Worker {
fmt.Println(worker.Id, " joined")
for i, w := range workers {
if worker.Id == w.Id {
if i == 0 {
workers = workers[1:]
} else if i == len(workers)-1 {
workers = workers[:i]
} else {
workers = append(workers[:i], workers[i+1:]...)
}
break
}
}
return append(workers, worker)
}
func PurgeInactiveWorkers() {
now := time.Now()
for i, worker := range workerQueue {
if now.Before(worker.Expire) {
workerQueue = workerQueue[i:]
return
}
}
workerQueue = workerQueue[0:0]
}
func LoadBalance() {
// Loop:
heartbeat := time.Tick(HEARTBEAT_INTERVAL)
for {
var sockets []zmq.Polled
// If you have available workers, poll on the both front and backend
// If not poll on backend with infinite timeout
if len(workerQueue) > 0 {
sockets, _ = brokerPoller.Poll(HEARTBEAT_INTERVAL)
} else {
sockets, _ = workerPoller.Poll(HEARTBEAT_INTERVAL)
}
for _, socket := range sockets {
switch socket.Socket {
// backend is a router
case backend:
workerId, _ := backend.Recv(0)
workerQueue = AddReadyWorker(workerQueue, NewWorker(workerId))
clientId, _ := backend.Recv(0)
if clientId != MESSAGE_READY && clientId != MESSAGE_HEARTBEAT {
route, _ := backend.Recv(0)
message, _ := backend.RecvBytes(0)
fmt.Println("Received response")
RouteResponse(route, message)
// frontend.Send(clientId, zmq.SNDMORE)
// frontend.Send("", zmq.SNDMORE)
// frontend.SendBytes(message, 0)
}
// frontend is a dealer
case frontend:
clientId, _ := frontend.Recv(0)
route, _ := frontend.Recv(0)
message, _ := frontend.RecvBytes(0)
backend.Send(workerQueue[0].Id, zmq.SNDMORE)
backend.Send(clientId, zmq.SNDMORE)
backend.Send(route, zmq.SNDMORE)
backend.SendBytes(message, 0)
workerQueue = workerQueue[1:]
}
}
select {
case <-heartbeat:
for _, worker := range workerQueue {
backend.Send(worker.Id, zmq.SNDMORE)
backend.Send(MESSAGE_HEARTBEAT, 0)
}
break
default:
}
PurgeInactiveWorkers()
}
}
如果后端发送了一条消息,但在一段时间内没有真正发送给工作人员,我希望它过期并且永远不会发送。 是否有可以完成此操作的套接字选项?如果没有,我需要做什么才能做到这一点?
我认为可以在没有套接字选项的情况下执行此操作的两种方法是:
1) 让后端将消息包装在包装器中并发送到 golang 队列,而不是通过 zeromq。包装器包含消息 "sent" 的时间。后端同时从golang队列前端拉取一条消息,检查消息是否过期。如果是,请不要发送,如果不是,请发送消息。我可以让后端先将消息添加到 golang 队列,然后在同一代码块中真正将其发送出去。这样,我就不需要锁了。
2) 通过 zeromq 将包装器消息发送到检索器,检索器检查其是否过期并且 returns 提前。我不喜欢这个,因为它似乎对性能不利。
在较新的 API 版本中,有一个选项可以丢弃所有 "old" 消息并始终只传送 "newest" 一个。
如果这符合您的期望,并且如果所有同行都满足 API v.4.0+,那么您就完成了。
ZMQ_CONFLATE
: Keep only last message
If set, a socket shall keep only one message in its inbound/outbound queue, this message being the last message received/the last message to be sent. IgnoresZMQ_RCVHWM
andZMQ_SNDHWM
options. Does not support multi-part messages, in particular, only one part of it is kept in the socket internal queue.
Option value type
int
Option value unit
boolean
Default value
0 (false)
Applicable socket types
ZMQ_PULL
,ZMQ_PUSH
,ZMQ_SUB
,ZMQ_PUB
,ZMQ_DEALER
您要做的是将通信用作执行集合点。发件人想知道收件人何时收到消息。
ZMQ实现了Actor模型。您需要修改通信顺序进程模型(发送超时的模型)。基本上你需要添加控制消息流 to/from 工人,想法是服务器要求工人接收消息,服务器等待回复。回复意味着工作人员已准备好立即接收消息,并且服务器和工作人员都在其程序流程中的 send/receive 会合。如果该回复未能在超时秒内到达,则服务器不会发送实际消息。
或者您可以通过将所有内容都交给工作人员来欺骗,包装在带有 "sent at time X" 字段的消息中,并让工作人员决定丢弃旧消息。
最后,解决方案是像@colini 和@bazza 提到的那样添加一个 expires-at 属性,并在每次心跳后从队列中删除超时消息。然而,这样做并满足我的应用程序的所有要求被证明比第一眼看起来更困难,所以我最终使用了 RabbitMQ,它的 ttl-expires 参数提供了所需的功能。