Go-Stomp读取超时如何解决
How to solve Go-Stomp read timeout
尝试使用 Go-Stomp 订阅 ActiveMQ(Apollo),但出现读取超时错误。我的应用程序应该每天 24 小时运行以处理传入的消息。
问题:
- 有没有办法在队列中没有更多消息的情况下保持订阅?尝试放置 ConnOpt.HeartBeat 似乎也不起作用
- 为什么读取超时后,我好像还接受了一条消息?
以下是我的步骤:
- 我在输入队列中放入了 1000 条消息进行测试
- 运行 订阅者,下面提供代码
- 订阅者阅读完 1000 条消息 2-3 秒后,看到错误“2016/10/07 17:12:44 订阅 1:/queue/hflc-in:错误 message:read 超时”。
- 再添加 1000 条消息,但似乎订阅已经关闭,因此没有消息未被处理
我的代码:
var(
serverAddr = flag.String("server", "10.92.10.10:61613", "STOMP server endpoint")
messageCount = flag.Int("count", 10, "Number of messages to send/receive")
inputQ = flag.String("inputq", "/queue/hflc-in", "Input queue")
)
var options []func(*stomp.Conn) error = []func(*stomp.Conn) error{
stomp.ConnOpt.Login("userid", "userpassword"),
stomp.ConnOpt.Host("mybroker"),
stomp.ConnOpt.HeartBeat(360*time.Second, 360*time.Second), // I put this but seems no impact
}
func main() {
flag.Parse()
jobschan := make(chan bean.Request, 10)
//my init setup
go getInput(1, jobschan)
}
func getInput(id int, jobschan chan bean.Request) {
conn, err := stomp.Dial("tcp", *serverAddr, options...)
if err != nil {
println("cannot connect to server", err.Error())
return
}
fmt.Printf("Connected %v \n", id)
sub, err := conn.Subscribe(*inputQ, stomp.AckClient)
if err != nil {
println("cannot subscribe to", *inputQ, err.Error())
return
}
fmt.Printf("Subscribed %v \n", id)
var messageCount int
for {
msg := <-sub.C
//expectedText := fmt.Sprintf("Message #%d", i)
if msg != nil {
actualText := string(msg.Body)
var req bean.Request
if actualText != "SHUTDOWN" {
messageCount = messageCount + 1
var err2 = easyjson.Unmarshal([]byte(actualText), &req)
if err2 != nil {
log.Error("Unable unmarshall", zap.Error(err))
println("message body %v", msg.Body) // what is [0/0]0x0 ?
} else {
fmt.Printf("Subscriber %v received message, count %v \n ", id, messageCount)
jobschan <- req
}
} else {
logchan <- "got some issue"
}
}
}
}
错误:
2016/10/07 17:12:44 Subscription 1: /queue/hflc-in: ERROR message:read timeout
[E] 2016-10-07T09:12:44Z Unable unmarshall
message body %v [0/0]0x0
通过添加这些行解决:
在 Apollo 中,注意到队列在几秒后为空后被删除,因此将 auto_delete_after 放入 apollo.xml 几个小时,例如:
<queue id="hflc-in" dlq="dlq-in" nak_limit="3" auto_delete_after="7200"/>
<queue id="hflc-log" dlq="dlq-log" nak_limit="3" auto_delete_after="7200"/>
<queue id="hflc-out" dlq="dlq-out" nak_limit="3" auto_delete_after="7200"/>
在 Go 中,注意到 go-stomp 在队列中找不到任何消息后会立即放弃,因此在 conn 选项中,添加 HeartBeat Error
var options []func(*stomp.Conn) error = []func(*stomp.Conn) error{
//.... original configuration
stomp.ConnOpt.HeartBeatError(360 * time.Second),
}
不过对第2题还是一头雾水
关于 #2 我遇到了同样的问题,因为我将我的进程包装在一个无限循环中。我发现的是,在“最后一条消息”上,您从频道订阅中获得了一条空消息的响应,但包含超时错误。这是为了能够优雅地处理断开连接。这是我在我的应用程序上实现它的方式
func process(subscription *stomp.Subscription) (error) {
log.Println("Waiting for a message")
msg := <- subscription.C
if msg.Body != nil {
log.Println("Message from the queue: ", string(msg.Body))
} else {
log.Println("message is empty")
log.Println("error consuming more messages", msg.Err.Error())
return msg.Err
}
return nil
}
尝试使用 Go-Stomp 订阅 ActiveMQ(Apollo),但出现读取超时错误。我的应用程序应该每天 24 小时运行以处理传入的消息。
问题:
- 有没有办法在队列中没有更多消息的情况下保持订阅?尝试放置 ConnOpt.HeartBeat 似乎也不起作用
- 为什么读取超时后,我好像还接受了一条消息?
以下是我的步骤:
- 我在输入队列中放入了 1000 条消息进行测试
- 运行 订阅者,下面提供代码
- 订阅者阅读完 1000 条消息 2-3 秒后,看到错误“2016/10/07 17:12:44 订阅 1:/queue/hflc-in:错误 message:read 超时”。
- 再添加 1000 条消息,但似乎订阅已经关闭,因此没有消息未被处理
我的代码:
var(
serverAddr = flag.String("server", "10.92.10.10:61613", "STOMP server endpoint")
messageCount = flag.Int("count", 10, "Number of messages to send/receive")
inputQ = flag.String("inputq", "/queue/hflc-in", "Input queue")
)
var options []func(*stomp.Conn) error = []func(*stomp.Conn) error{
stomp.ConnOpt.Login("userid", "userpassword"),
stomp.ConnOpt.Host("mybroker"),
stomp.ConnOpt.HeartBeat(360*time.Second, 360*time.Second), // I put this but seems no impact
}
func main() {
flag.Parse()
jobschan := make(chan bean.Request, 10)
//my init setup
go getInput(1, jobschan)
}
func getInput(id int, jobschan chan bean.Request) {
conn, err := stomp.Dial("tcp", *serverAddr, options...)
if err != nil {
println("cannot connect to server", err.Error())
return
}
fmt.Printf("Connected %v \n", id)
sub, err := conn.Subscribe(*inputQ, stomp.AckClient)
if err != nil {
println("cannot subscribe to", *inputQ, err.Error())
return
}
fmt.Printf("Subscribed %v \n", id)
var messageCount int
for {
msg := <-sub.C
//expectedText := fmt.Sprintf("Message #%d", i)
if msg != nil {
actualText := string(msg.Body)
var req bean.Request
if actualText != "SHUTDOWN" {
messageCount = messageCount + 1
var err2 = easyjson.Unmarshal([]byte(actualText), &req)
if err2 != nil {
log.Error("Unable unmarshall", zap.Error(err))
println("message body %v", msg.Body) // what is [0/0]0x0 ?
} else {
fmt.Printf("Subscriber %v received message, count %v \n ", id, messageCount)
jobschan <- req
}
} else {
logchan <- "got some issue"
}
}
}
}
错误:
2016/10/07 17:12:44 Subscription 1: /queue/hflc-in: ERROR message:read timeout
[E] 2016-10-07T09:12:44Z Unable unmarshall
message body %v [0/0]0x0
通过添加这些行解决:
在 Apollo 中,注意到队列在几秒后为空后被删除,因此将 auto_delete_after 放入 apollo.xml 几个小时,例如:
<queue id="hflc-in" dlq="dlq-in" nak_limit="3" auto_delete_after="7200"/>
<queue id="hflc-log" dlq="dlq-log" nak_limit="3" auto_delete_after="7200"/>
<queue id="hflc-out" dlq="dlq-out" nak_limit="3" auto_delete_after="7200"/>
在 Go 中,注意到 go-stomp 在队列中找不到任何消息后会立即放弃,因此在 conn 选项中,添加 HeartBeat Error
var options []func(*stomp.Conn) error = []func(*stomp.Conn) error{
//.... original configuration
stomp.ConnOpt.HeartBeatError(360 * time.Second),
}
不过对第2题还是一头雾水
关于 #2 我遇到了同样的问题,因为我将我的进程包装在一个无限循环中。我发现的是,在“最后一条消息”上,您从频道订阅中获得了一条空消息的响应,但包含超时错误。这是为了能够优雅地处理断开连接。这是我在我的应用程序上实现它的方式
func process(subscription *stomp.Subscription) (error) {
log.Println("Waiting for a message")
msg := <- subscription.C
if msg.Body != nil {
log.Println("Message from the queue: ", string(msg.Body))
} else {
log.Println("message is empty")
log.Println("error consuming more messages", msg.Err.Error())
return msg.Err
}
return nil
}