如何从 go 传递消息并使用 rabbitmq 从 nestjs 消费它?

how to pass a message from go and consuming it from nestjs with rabbitmq?

我有一个 go 服务向 rabbitmq 发布消息,负责该部分的代码如下:

package main

import (
    "log"

    "github.com/streadway/amqp"
)

func main() {
    conn, amqError := amqp.Dial("amqp://localhost:5672/")
    if amqError != nil {
        panic(amqError)
    }

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    q, err := ch.QueueDeclare(
        "default", // name
        true,      // durable
        false,     // delete when unused
        false,     // exclusive
        false,     // no-wait
        nil,       // arguments
    )
    failOnError(err, "Failed to declare a queue")

    body := "{ \"body\":\"Hello...\", \"pattern\":\"test\",  \"age\":\"20\"}"

    err = ch.Publish(
        "",     // exchange
        q.Name, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
        },
    )
    failOnError(err, "Failed to publish a message")

}

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

消费消息的 Nestjs 部分如下:

import { Controller } from '@nestjs/common';
import { Ctx, EventPattern, Payload, RmqContext } from '@nestjs/microservices';

@Controller()
export class AppController {
  constructor() { }

  @EventPattern("test")
  getEventMessage(@Payload() data: any, @Ctx() context: RmqContext) {
    console.log("data is -> ", data) // always undefined
    console.log(
      "content of message is -> ",
      JSON.parse(
        context.getMessage().content.toString() // from buffer to string
      )
    )
  }
}

现在的问题是我无法从数据中获取消息而不是从 ctx 中解析它,我还需要在不跳过双引号的情况下以 json 的形式发送消息这个"\""

以下是对您问题的回复:

  1. 在 NextJS 的示例中,他们没有提供如何使用有效载荷数据,而是 say:

To access the original RabbitMQ message (with the properties, fields, and content), use the getMessage() method of the RmqContext

鉴于上述陈述,您正在正确解析队列中的消息。

  1. 避免手动发送正文字符串,您应该使用一个名为json Marhsal of struct 的过程,例如:
  • 您应该创建一个包含要发送到队列的信息的结构
  • 编组结构并生成 []byte

type MessageQueue struct {
    Body    string `json:"body"`
    Pattern string `json:"pattern"`
    Age     string `json:"age"`
    Data    string `json:"data"`
}

func NewMessageQueue(body, pattern, age string, data) *MessageQueue {
    return &MessageQueue{
        body, pattern, age, data
    }
}

func (m *MessageQueue) Marshal() ([]byte, error) {
    bytes, err := json.Marshal(m)

    if err != nil {
        return nil, err
    }
    return bytes, err
}

func main() {
    ...

    message := NewMessageQueue("Hello...", "test", "20", "data...")
    // TODO: check the error
    body, _ := message.Marshal()

    err = ch.Publish(
        "",         // exchange
        q.Name,     // routing key
        false,      // mandatory
        false,      // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        body,
        },
    )

    ...
}

更新:

  1. 控制器在 getEventMessage 方法上接收到的 data 参数,它应该从 Golang 发送到 body 上以被 Nestjs 反序列化。这意味着结构应如下所示:

type MessageQueue struct {
    Body    string `json:"body"`
    Pattern string `json:"pattern"`
    Age     string `json:"age"`
    Data    string `json:"data"`
}