如何从 go 传递消息并使用 rabbitmq 从 nestjs 消费它?
how to pass a message from go and consuming it from nestjs with rabbitmq?
我有一个 go
服务向 rabbitmq
发布消息,负责该部分的代码如下:
package main
import (
"log"
"github.com/streadway/amqp"
)
func main() {
conn, amqError := amqp.Dial("amqp://localhost:5672/")
if amqError != nil {
panic(amqError)
}
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
q, err := ch.QueueDeclare(
"default", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
body := "{ \"body\":\"Hello...\", \"pattern\":\"test\", \"age\":\"20\"}"
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
},
)
failOnError(err, "Failed to publish a message")
}
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
消费消息的 Nestjs
部分如下:
import { Controller } from '@nestjs/common';
import { Ctx, EventPattern, Payload, RmqContext } from '@nestjs/microservices';
@Controller()
export class AppController {
constructor() { }
@EventPattern("test")
getEventMessage(@Payload() data: any, @Ctx() context: RmqContext) {
console.log("data is -> ", data) // always undefined
console.log(
"content of message is -> ",
JSON.parse(
context.getMessage().content.toString() // from buffer to string
)
)
}
}
现在的问题是我无法从数据中获取消息而不是从 ctx
中解析它,我还需要在不跳过双引号的情况下以 json
的形式发送消息这个"\""
以下是对您问题的回复:
- 在 NextJS 的示例中,他们没有提供如何使用有效载荷数据,而是 say:
To access the original RabbitMQ message (with the properties, fields,
and content), use the getMessage() method of the RmqContext
鉴于上述陈述,您正在正确解析队列中的消息。
- 避免手动发送正文字符串,您应该使用一个名为json Marhsal of struct 的过程,例如:
- 您应该创建一个包含要发送到队列的信息的结构
- 编组结构并生成
[]byte
type MessageQueue struct {
Body string `json:"body"`
Pattern string `json:"pattern"`
Age string `json:"age"`
Data string `json:"data"`
}
func NewMessageQueue(body, pattern, age string, data) *MessageQueue {
return &MessageQueue{
body, pattern, age, data
}
}
func (m *MessageQueue) Marshal() ([]byte, error) {
bytes, err := json.Marshal(m)
if err != nil {
return nil, err
}
return bytes, err
}
func main() {
...
message := NewMessageQueue("Hello...", "test", "20", "data...")
// TODO: check the error
body, _ := message.Marshal()
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: body,
},
)
...
}
更新:
- 控制器在
getEventMessage
方法上接收到的 data
参数,它应该从 Golang 发送到 body 上以被 Nestjs
反序列化。这意味着结构应如下所示:
type MessageQueue struct {
Body string `json:"body"`
Pattern string `json:"pattern"`
Age string `json:"age"`
Data string `json:"data"`
}
我有一个 go
服务向 rabbitmq
发布消息,负责该部分的代码如下:
package main
import (
"log"
"github.com/streadway/amqp"
)
func main() {
conn, amqError := amqp.Dial("amqp://localhost:5672/")
if amqError != nil {
panic(amqError)
}
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
q, err := ch.QueueDeclare(
"default", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
body := "{ \"body\":\"Hello...\", \"pattern\":\"test\", \"age\":\"20\"}"
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
},
)
failOnError(err, "Failed to publish a message")
}
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
消费消息的 Nestjs
部分如下:
import { Controller } from '@nestjs/common';
import { Ctx, EventPattern, Payload, RmqContext } from '@nestjs/microservices';
@Controller()
export class AppController {
constructor() { }
@EventPattern("test")
getEventMessage(@Payload() data: any, @Ctx() context: RmqContext) {
console.log("data is -> ", data) // always undefined
console.log(
"content of message is -> ",
JSON.parse(
context.getMessage().content.toString() // from buffer to string
)
)
}
}
现在的问题是我无法从数据中获取消息而不是从 ctx
中解析它,我还需要在不跳过双引号的情况下以 json
的形式发送消息这个"\""
以下是对您问题的回复:
- 在 NextJS 的示例中,他们没有提供如何使用有效载荷数据,而是 say:
To access the original RabbitMQ message (with the properties, fields, and content), use the getMessage() method of the RmqContext
鉴于上述陈述,您正在正确解析队列中的消息。
- 避免手动发送正文字符串,您应该使用一个名为json Marhsal of struct 的过程,例如:
- 您应该创建一个包含要发送到队列的信息的结构
- 编组结构并生成
[]byte
type MessageQueue struct {
Body string `json:"body"`
Pattern string `json:"pattern"`
Age string `json:"age"`
Data string `json:"data"`
}
func NewMessageQueue(body, pattern, age string, data) *MessageQueue {
return &MessageQueue{
body, pattern, age, data
}
}
func (m *MessageQueue) Marshal() ([]byte, error) {
bytes, err := json.Marshal(m)
if err != nil {
return nil, err
}
return bytes, err
}
func main() {
...
message := NewMessageQueue("Hello...", "test", "20", "data...")
// TODO: check the error
body, _ := message.Marshal()
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: body,
},
)
...
}
更新:
- 控制器在
getEventMessage
方法上接收到的data
参数,它应该从 Golang 发送到 body 上以被Nestjs
反序列化。这意味着结构应如下所示:
type MessageQueue struct {
Body string `json:"body"`
Pattern string `json:"pattern"`
Age string `json:"age"`
Data string `json:"data"`
}