Go 中的 RabbitMQ 消费者
RabbitMQ consumer in Go
我正在尝试用 Go 编写 RabbitMQ 消费者。假设一次从队列中取出 5 个对象并处理它们。而且假设成功处理else发送5次到死信队列然后丢弃,应该是运行无限处理消费者的取消事件。
我有几个问题:
- RabbitMq-go中有
BasicConsumer
vs EventingBasicConsumer
的概念吗Reference?
- RabbitMQ 中的
Model
是什么,RabbitMq-go 中有吗?
- 如何将对象发送到死信队列失败并在
ttl
后重新排队
- 下面代码中
ch.Consume
函数中 consumerTag
参数的意义是什么
- 对于这种情况,我们应该使用
channel.Get()
还是 channel.Consume()
?
我需要在下面的代码中进行哪些更改才能满足上述要求。我问这个是因为我找不到合适的 RabbitMq-Go 文档。
func main() {
consumer()
}
func consumer() {
objConsumerConn := &rabbitMQConn{queueName: "EventCaptureData", conn: nil}
initializeConn(&objConsumerConn.conn)
ch, err := objConsumerConn.conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
msgs, err := ch.Consume(
objConsumerConn.queueName, // queue
"demo1", // consumerTag
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
k := new(EventCaptureData)
b := bytes.Buffer{}
b.Write(d.Body)
dec := gob.NewDecoder(&b)
err := dec.Decode(&k)
d.Ack(true)
if err != nil { fmt.Println("failed to fetch the data from consumer", err); }
fmt.Println(k)
}
}()
log.Printf(" Waiting for Messages to process. To exit press CTRL+C ")
<-forever
}
已编辑问题:
我已按照链接 link1 link2 中的建议延迟处理消息。但问题是即使在 ttl 之后,消息也会从死信队列返回到它们的原始队列。我正在使用 RabbitMQ 3.0.0
。谁能指出问题所在?
Is there any concept of BasicConsumer vs EventingBasicConsumer in
RabbitMq-go Reference?
不完全是,但是 Channel.Get
和 Channel.Consume
调用服务于类似的概念。使用 Channel.Get
时,您有一个 non-blocking 调用,如果有任何可用消息,该调用将获取第一条消息,或者 returns ok=false
。使用 Channel.Consume
,queued 条消息被传送到频道。
What is Model in RabbitMQ and is it there in RabbitMq-go?
如果您指的是 C# RabbitMQ 中的 IModel
和 Connection.CreateModel
,那是 C# 库中的内容,而不是 RabbitMQ 本身的内容。它只是试图从 RabbitMQ "Channel" 术语中抽象出来,但它从未流行起来。
How to send the objects when failed to dead-letter queue and again
re-queue them after ttl
将 delivery.Nack 方法与 requeue=false
结合使用。
What is the significance of consumerTag argument in the ch.Consume
function in the below code
ConsumerTag
只是一个消费者标识符。可用于取消带channel.Cancel的通道,并标识负责配送的消费者。使用 channel.Consume
传送的所有消息都将设置 ConsumerTag
字段。
Should we use the channel.Get()
or channel.Consume()
for this scenario?
我认为 channel.Get()
几乎永远不会优于 channel.Consume()
。使用 channel.Get
你将轮询 queue 并浪费 CPU 什么都不做,这在 Go 中没有意义。
What are the changes i need to make in the below code to meet above
requirement.
因为你一次批处理 5 个,你可以有一个从消费者渠道接收的 goroutine,一旦它收到 5 个交付,你就调用另一个函数来处理它们。
要确认或发送给 dead-letter queue,您将使用 delivery.Ack 或 delivery.Nack 函数。您可以使用 multiple=true
并为批处理调用一次。一旦消息进入死信 queue,您必须检查 delivery.Headers["x-death"]
header 它被 dead-letter 编辑了多少次并调用 delivery.Reject 已经重试5次了。
使用channel.NotifyCancel处理取消事件。
我正在尝试用 Go 编写 RabbitMQ 消费者。假设一次从队列中取出 5 个对象并处理它们。而且假设成功处理else发送5次到死信队列然后丢弃,应该是运行无限处理消费者的取消事件。 我有几个问题:
- RabbitMq-go中有
BasicConsumer
vsEventingBasicConsumer
的概念吗Reference? - RabbitMQ 中的
Model
是什么,RabbitMq-go 中有吗? - 如何将对象发送到死信队列失败并在
ttl
后重新排队
- 下面代码中
ch.Consume
函数中consumerTag
参数的意义是什么 - 对于这种情况,我们应该使用
channel.Get()
还是channel.Consume()
?
我需要在下面的代码中进行哪些更改才能满足上述要求。我问这个是因为我找不到合适的 RabbitMq-Go 文档。
func main() {
consumer()
}
func consumer() {
objConsumerConn := &rabbitMQConn{queueName: "EventCaptureData", conn: nil}
initializeConn(&objConsumerConn.conn)
ch, err := objConsumerConn.conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
msgs, err := ch.Consume(
objConsumerConn.queueName, // queue
"demo1", // consumerTag
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
k := new(EventCaptureData)
b := bytes.Buffer{}
b.Write(d.Body)
dec := gob.NewDecoder(&b)
err := dec.Decode(&k)
d.Ack(true)
if err != nil { fmt.Println("failed to fetch the data from consumer", err); }
fmt.Println(k)
}
}()
log.Printf(" Waiting for Messages to process. To exit press CTRL+C ")
<-forever
}
已编辑问题:
我已按照链接 link1 link2 中的建议延迟处理消息。但问题是即使在 ttl 之后,消息也会从死信队列返回到它们的原始队列。我正在使用 RabbitMQ 3.0.0
。谁能指出问题所在?
Is there any concept of BasicConsumer vs EventingBasicConsumer in RabbitMq-go Reference?
不完全是,但是 Channel.Get
和 Channel.Consume
调用服务于类似的概念。使用 Channel.Get
时,您有一个 non-blocking 调用,如果有任何可用消息,该调用将获取第一条消息,或者 returns ok=false
。使用 Channel.Consume
,queued 条消息被传送到频道。
What is Model in RabbitMQ and is it there in RabbitMq-go?
如果您指的是 C# RabbitMQ 中的 IModel
和 Connection.CreateModel
,那是 C# 库中的内容,而不是 RabbitMQ 本身的内容。它只是试图从 RabbitMQ "Channel" 术语中抽象出来,但它从未流行起来。
How to send the objects when failed to dead-letter queue and again re-queue them after ttl
将 delivery.Nack 方法与 requeue=false
结合使用。
What is the significance of consumerTag argument in the ch.Consume function in the below code
ConsumerTag
只是一个消费者标识符。可用于取消带channel.Cancel的通道,并标识负责配送的消费者。使用 channel.Consume
传送的所有消息都将设置 ConsumerTag
字段。
Should we use the
channel.Get()
orchannel.Consume()
for this scenario?
我认为 channel.Get()
几乎永远不会优于 channel.Consume()
。使用 channel.Get
你将轮询 queue 并浪费 CPU 什么都不做,这在 Go 中没有意义。
What are the changes i need to make in the below code to meet above requirement.
因为你一次批处理 5 个,你可以有一个从消费者渠道接收的 goroutine,一旦它收到 5 个交付,你就调用另一个函数来处理它们。
要确认或发送给 dead-letter queue,您将使用 delivery.Ack 或 delivery.Nack 函数。您可以使用
multiple=true
并为批处理调用一次。一旦消息进入死信 queue,您必须检查delivery.Headers["x-death"]
header 它被 dead-letter 编辑了多少次并调用 delivery.Reject 已经重试5次了。使用channel.NotifyCancel处理取消事件。