NestJS MQTT 微服务的有效@MessagePattern 是什么?
What's a valid @MessagePattern for NestJS MQTT microservice?
我正在尝试根据 the docs 使用 NestJS 设置 MQTT 微服务。
我已经使用 Docker 启动了一个可用的 Mosquitto Broker,并使用各种 MQTT 客户端验证了它的可操作性。现在,当我启动 NestJS 服务时,它似乎连接正确(mqqt.fx 显示新客户端),但我无法在我的控制器中接收任何消息。
这是我的引导,就像在文档中一样:
main.ts
async function bootstrap() {
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.MQTT,
options: {
host: 'localhost',
port: 1883,
protocol: 'tcp'
}
});
app.listen(() => console.log('Microservice is listening'));
}
bootstrap();
app.controller.ts
@Controller()
export class AppController {
@MessagePattern('mytopic') // tried {cmd:'mytopic'} or {topic:'mytopic'}
root(msg: Buffer) {
console.log('received: ', msg)
}
}
我是不是错误地使用了消息模式装饰器,或者我对 NestJS MQTT 微服务应该做什么的概念是错误的?我认为它可能会订阅我传递给装饰器的主题。我唯一的其他信息来源是相应的 unit tests
文档不是很清楚,但似乎对于 mqtt,如果你有 @MessagePattern('mytopic')
,你可以发布关于主题 mytopic_ack
的命令,你会在 mytopic_res
上得到回应.我仍在尝试找出如何从服务发布到 mqtt 代理。
public getAckQueueName(pattern: string): string {
return `${pattern}_ack`;
}
public getResQueueName(pattern: string): string {
return `${pattern}_res`;
}
nest.js 模式处理程序
在 nest.js 端,我们有以下模式处理程序:
@MessagePattern('sum')
sum(data: number[]): number {
return data.reduce((a, b) => a + b, 0);
}
正如 所解释的那样,这实际上会收听 sum_ack
。
非nest.js客户
非nest.js 客户端可能看起来像这样(只需保存为 client.js、运行 npm install mqtt
和 运行 带有 [=16 的程序=]):
var mqtt = require('mqtt')
var client = mqtt.connect('mqtt://localhost:1883')
client.on('connect', function () {
client.subscribe('sum_res', function (err) {
if (!err) {
client.publish('sum_ack', '{"data": [2, 3]}');
}
})
})
client.on('message', function (topic, message) {
console.log(message.toString())
client.end()
})
它发送关于主题 sum_ack
的消息并收听关于 sum_res
的消息。当它在 sum_res
上收到消息时,它会记录消息并结束程序。 nest.js 期望消息格式为 {data: myData}
然后调用参数处理程序 sum(myData)
.
// Log:
{"err":null,"response":5} // This is the response from sum()
{"isDisposed":true} // Internal "complete event" (according to unit test)
当然,这样不是很方便...
nest.js 客户端
那是因为这是要与另一个 nest.js 客户端一起使用,而不是与普通的 mqtt 客户端一起使用。 nest.js 客户端抽象出所有内部逻辑。参见this answer,其中描述了redis的客户端(mqtt只需要改两行)。
async onModuleInit() {
await this.client.connect();
// no 'sum_ack' or {data: [0, 2, 3]} needed
this.client.send('sum', [0, 2, 3]).toPromise();
}
我今天和 MQTT 打架了,这对我有一点帮助,但我遇到了更多问题,你可以在下面看到我的发现:
代理配置方式错误URL
在我的例子中,当我使用 non-local MQTT 服务器时,我从这个开始:
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.MQTT,
options: {
host: 'test.mosquitto.org',
port: 1883,
protocol: 'tcp',
},
});
await app.listenAsync();
但是就像您可以读入 constructor of ServerMqtt they use url
option only (when not provided it fallbacks to 'mqtt://localhost:1883'
. While I do not have local MQTT it will never resolve app.listenAsync()
which is resolved only on connect 一样,也不会 运行 任何处理程序。
当我调整代码以使用 url
选项时它开始工作。
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.MQTT,
options: {
url: 'mqtt://test.mosquitto.org:1883',
},
});
await app.listenAsync();
消息需要id
属性
第二个非常奇怪的问题是,当我使用来自@KimKern 的 Non-nest.js Client 脚本时,我必须注册两个 MessagePatterns:sum
和 sum_ack
:
@MessagePattern('sum')
sum(data: number[]): number {
return data.reduce((a, b) => a + b, 0);
}
@MessagePattern('sum_ack')
sumAck(data: number[]): number {
return data.reduce((a, b) => a + b, 0);
}
当我使用 console.log
时,我发现后者正在 运行 但只有当第一个存在时才存在。您可以使用 mqtt cli 工具将相同的消息推送到代理来检查它:
mqtt pub -t 'sum_ack' -h 'test.mosquitto.org' -m '{"data":[1,2]}'
但最大的问题是没有回复(发布sum_res).
解决方案是在发送消息时也提供 id
。
mqtt pub -t 'sum_ack' -h 'test.mosquitto.org' -m '{"data":[1,2], "id":"any-id"}'
然后我们可以删除 'sum_ack' MessagePattern 并仅保留此代码:
@MessagePattern('sum')
sum(data: number[]): number {
return data.reduce((a, b) => a + b, 0);
}
原因隐藏在 ServerMqtt 的 handleMessage
方法中,不会 publish response from a handler if a message didn't have id
。
TL/DR
仅使用 url
选项将 url 指定给消息代理,并始终为消息提供 id
。
我希望这会为其他人节省一些时间。
祝您黑客愉快!
@Tanas 是对的。 Nestjs/Microservice 现在收听您的 $[topic] 并回答 $[topic]/reply。后缀 _ack 和 _res 已弃用。
例如:
@MessagePattern('helloWorld')
getHello(): string {
console.log("hello world")
return this.appService.getHello();
}
现在收听主题:helloWorld
现在回复主题 helloWorld/reply
关于ID
你应该也在有效载荷中提供一个ID(见@Hakier),Nestjs将回复一个包含你的ID的答案。
如果你没有任何id,仍然不会有任何回复,但相应的逻辑仍然会触发。
例如(使用上面的片段):
你的留言:
{"data":"foo","id":"bar"}
Nestjs 回复:
{"response":"Hello World!","isDisposed":true,"id":"bar"}
无ID:
您的留言:
{"data":"foo"} or {}
没有回复,但在终端Hello World
我正在尝试根据 the docs 使用 NestJS 设置 MQTT 微服务。
我已经使用 Docker 启动了一个可用的 Mosquitto Broker,并使用各种 MQTT 客户端验证了它的可操作性。现在,当我启动 NestJS 服务时,它似乎连接正确(mqqt.fx 显示新客户端),但我无法在我的控制器中接收任何消息。 这是我的引导,就像在文档中一样:
main.ts
async function bootstrap() {
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.MQTT,
options: {
host: 'localhost',
port: 1883,
protocol: 'tcp'
}
});
app.listen(() => console.log('Microservice is listening'));
}
bootstrap();
app.controller.ts
@Controller()
export class AppController {
@MessagePattern('mytopic') // tried {cmd:'mytopic'} or {topic:'mytopic'}
root(msg: Buffer) {
console.log('received: ', msg)
}
}
我是不是错误地使用了消息模式装饰器,或者我对 NestJS MQTT 微服务应该做什么的概念是错误的?我认为它可能会订阅我传递给装饰器的主题。我唯一的其他信息来源是相应的 unit tests
文档不是很清楚,但似乎对于 mqtt,如果你有 @MessagePattern('mytopic')
,你可以发布关于主题 mytopic_ack
的命令,你会在 mytopic_res
上得到回应.我仍在尝试找出如何从服务发布到 mqtt 代理。
public getAckQueueName(pattern: string): string {
return `${pattern}_ack`;
}
public getResQueueName(pattern: string): string {
return `${pattern}_res`;
}
nest.js 模式处理程序
在 nest.js 端,我们有以下模式处理程序:
@MessagePattern('sum')
sum(data: number[]): number {
return data.reduce((a, b) => a + b, 0);
}
正如 sum_ack
。
非nest.js客户
非nest.js 客户端可能看起来像这样(只需保存为 client.js、运行 npm install mqtt
和 运行 带有 [=16 的程序=]):
var mqtt = require('mqtt')
var client = mqtt.connect('mqtt://localhost:1883')
client.on('connect', function () {
client.subscribe('sum_res', function (err) {
if (!err) {
client.publish('sum_ack', '{"data": [2, 3]}');
}
})
})
client.on('message', function (topic, message) {
console.log(message.toString())
client.end()
})
它发送关于主题 sum_ack
的消息并收听关于 sum_res
的消息。当它在 sum_res
上收到消息时,它会记录消息并结束程序。 nest.js 期望消息格式为 {data: myData}
然后调用参数处理程序 sum(myData)
.
// Log:
{"err":null,"response":5} // This is the response from sum()
{"isDisposed":true} // Internal "complete event" (according to unit test)
当然,这样不是很方便...
nest.js 客户端
那是因为这是要与另一个 nest.js 客户端一起使用,而不是与普通的 mqtt 客户端一起使用。 nest.js 客户端抽象出所有内部逻辑。参见this answer,其中描述了redis的客户端(mqtt只需要改两行)。
async onModuleInit() {
await this.client.connect();
// no 'sum_ack' or {data: [0, 2, 3]} needed
this.client.send('sum', [0, 2, 3]).toPromise();
}
我今天和 MQTT 打架了,这对我有一点帮助,但我遇到了更多问题,你可以在下面看到我的发现:
代理配置方式错误URL
在我的例子中,当我使用 non-local MQTT 服务器时,我从这个开始:
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.MQTT,
options: {
host: 'test.mosquitto.org',
port: 1883,
protocol: 'tcp',
},
});
await app.listenAsync();
但是就像您可以读入 constructor of ServerMqtt they use url
option only (when not provided it fallbacks to 'mqtt://localhost:1883'
. While I do not have local MQTT it will never resolve app.listenAsync()
which is resolved only on connect 一样,也不会 运行 任何处理程序。
当我调整代码以使用 url
选项时它开始工作。
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.MQTT,
options: {
url: 'mqtt://test.mosquitto.org:1883',
},
});
await app.listenAsync();
消息需要id
属性
第二个非常奇怪的问题是,当我使用来自@KimKern 的 Non-nest.js Client 脚本时,我必须注册两个 MessagePatterns:sum
和 sum_ack
:
@MessagePattern('sum')
sum(data: number[]): number {
return data.reduce((a, b) => a + b, 0);
}
@MessagePattern('sum_ack')
sumAck(data: number[]): number {
return data.reduce((a, b) => a + b, 0);
}
当我使用 console.log
时,我发现后者正在 运行 但只有当第一个存在时才存在。您可以使用 mqtt cli 工具将相同的消息推送到代理来检查它:
mqtt pub -t 'sum_ack' -h 'test.mosquitto.org' -m '{"data":[1,2]}'
但最大的问题是没有回复(发布sum_res).
解决方案是在发送消息时也提供 id
。
mqtt pub -t 'sum_ack' -h 'test.mosquitto.org' -m '{"data":[1,2], "id":"any-id"}'
然后我们可以删除 'sum_ack' MessagePattern 并仅保留此代码:
@MessagePattern('sum')
sum(data: number[]): number {
return data.reduce((a, b) => a + b, 0);
}
原因隐藏在 ServerMqtt 的 handleMessage
方法中,不会 publish response from a handler if a message didn't have id
。
TL/DR
仅使用 url
选项将 url 指定给消息代理,并始终为消息提供 id
。
我希望这会为其他人节省一些时间。
祝您黑客愉快!
@Tanas 是对的。 Nestjs/Microservice 现在收听您的 $[topic] 并回答 $[topic]/reply。后缀 _ack 和 _res 已弃用。
例如:
@MessagePattern('helloWorld')
getHello(): string {
console.log("hello world")
return this.appService.getHello();
}
现在收听主题:helloWorld
现在回复主题 helloWorld/reply
关于ID
你应该也在有效载荷中提供一个ID(见@Hakier),Nestjs将回复一个包含你的ID的答案。 如果你没有任何id,仍然不会有任何回复,但相应的逻辑仍然会触发。
例如(使用上面的片段):
你的留言:
{"data":"foo","id":"bar"}
Nestjs 回复:
{"response":"Hello World!","isDisposed":true,"id":"bar"}
无ID:
您的留言:
{"data":"foo"} or {}
没有回复,但在终端Hello World