Apollo GraphQL: MQTT 订阅一个代理只提供发布的数据
Apollo GraphQL: MQTT Subscribe to a Broker to just provide the published data
场景
我有一个传感器节点,它发布有关特定 MQTT 主题的信息(发送到 Mosquitto 代理)。发送的数据是纯字符串。
后端
目前我正在使用 apollo-server-express
构建 GraphQL 服务器。我希望使用 `graphql-mqtt-subscriptions 来:
- 订阅 MQTT 代理
- 阅读特定主题的信息,然后 return 将其发送到
graphiql
UI
dependencies
"dependencies": {
"apollo-server-express": "^2.8.1",
"express": "^4.17.1",
"graphql": "^14.4.2",
"graphql-mqtt-subscriptions": "^1.1.0",
"graphql-subscriptions": "^1.1.0",
"graphql-tools": "^4.0.5",
"mqtt": "^3.0.0",
"subscriptions-transport-ws": "^0.9.16"
},
代码片段
入口点server.js
代码:
import express from 'express';
import {ApolloServer } from 'apollo-server-express';
import { typeDefs } from './graphql/schema';
import { resolvers } from './graphql/resolvers';
import { createServer } from 'http';
const server = new ApolloServer({ typeDefs, resolvers});
const app = express();
server.applyMiddleware({ app });
const httpServer = createServer(app);
server.installSubscriptionHandlers(httpServer);
httpServer.listen({port: 4000}, () => {
console.log(` Server ready at http://localhost:4000/${server.graphqlPath}`)
console.log(` Subscriptions ready at ws://localhost:4000/${server.subscriptionsPath}`)
});
GraphQL 的 typeDefs
架构如下:
type Result {
data: String
}
type Subscription {
siteAdded(topic: String): Result
}
schema {
query: Query
mutation: Mutation
subscription: Subscription
}
其中 siteAdded(topic: String)
将采用 MQTT 需要订阅的主题。示例:
subscription {
siteAdded(topic: "test/1/env") {
data
}
resolvers.js
如下所示(如 5 月文档中所述):
import { MQTTPubSub } from 'graphql-mqtt-subscriptions';
import { connect } from 'mqtt';
const client = connect('mqtt://my.mqtt.broker.ip.address', {
reconnectPeriod: 1000,
});
const pubsub = new MQTTPubSub({
client
});
export const resolvers: {
Subscription: {
siteAdded: {
subscribe: (_, args) => {
console.log(args.topic); // to check if this gets called or not.
pubsub.asyncIterator([args.topic]);
}
}
}
};
推理
args.topic
上的 console.log
被调用,但之后 graphiql
中出现以下错误:
{
"error": {
"message": "Subscription field must return Async Iterable. Received: undefined"
}
}
如果我执行 return pubsub.asyncIterator()
:
它提供来自 Broker 的及时数据,但输出是 null
:
{
"data": {
"siteAdded": null
}
}
我根据Apollo Docs
在上面提到的server.js
中添加了Websockets中间件
我哪里出错了以及如何将来自订阅主题的数据添加到 graphiql
?
总结
更新
graphql-mqtt-subscriptions
: v1.2.0
现在提供通配符解析支持
-
注意事项
NPM Registry graphql-mqtt-subscriptions
v1.1.0 中的 +
和 #
等通配符不可用。但是,存储库已经有了实现。存储库的所有者需要更新注册表。参见 Open Issue for graphql-mqtt-subscriptions
我目前正在使用 MQTT 订阅的完整主题,以便从传感器获取数据,例如test/1/env
相对于 test/+/env
开发更新
之前我以原始字符串格式(纯文本)从传感器发送数据,因此我更新了固件以在 JSON 字符串中发送数据,如下所示:
{"data": "temp=23,humid=56 1500394302"}
解决方案
正如@Dom 和@DanielRearden 在评论中提到的,如果我使用大括号 {}
,我最初忘记添加 return
。例如:
Subscription: {
siteAdded: {
subscribe: (_, args) => {
console.log(args.topic); // to check if this gets called or not.
return pubsub.asyncIterator([args.topic]);
}
}
}
或者我只是删除括号和 return
通过编写解析器如下:
Subscription: {
siteAdded: {
subscribe: (_, args) => pubsub.asyncIterator([args.topic]),
}
}
如查询中所述,这仍在返回我 null
。
我能够按照 Apollo 的 Payload Transformation 文档从订阅中获取数据,在我的解析器中我执行了以下操作:
Subscription: {
siteAdded: {
resolve: (payload) => {
return {
data: payload.data,
};
},
subscribe: (_, args) => pubsub.asyncIterator([args.topic]),
}
}
必须为架构相应地解析有效负载。
结果
现在按如下方式订阅非常有效:
subscription {
siteAdded(topic: "test/1/env") {
data
}
}
提供以下结果:
{
"data": {
"siteAdded": {
"data": "temp=27.13,humid=43.33 1565345004"
}
}
}
场景
我有一个传感器节点,它发布有关特定 MQTT 主题的信息(发送到 Mosquitto 代理)。发送的数据是纯字符串。
后端
目前我正在使用 apollo-server-express
构建 GraphQL 服务器。我希望使用 `graphql-mqtt-subscriptions 来:
- 订阅 MQTT 代理
- 阅读特定主题的信息,然后 return 将其发送到
graphiql
UI
dependencies
"dependencies": {
"apollo-server-express": "^2.8.1",
"express": "^4.17.1",
"graphql": "^14.4.2",
"graphql-mqtt-subscriptions": "^1.1.0",
"graphql-subscriptions": "^1.1.0",
"graphql-tools": "^4.0.5",
"mqtt": "^3.0.0",
"subscriptions-transport-ws": "^0.9.16"
},
代码片段
入口点server.js
代码:
import express from 'express';
import {ApolloServer } from 'apollo-server-express';
import { typeDefs } from './graphql/schema';
import { resolvers } from './graphql/resolvers';
import { createServer } from 'http';
const server = new ApolloServer({ typeDefs, resolvers});
const app = express();
server.applyMiddleware({ app });
const httpServer = createServer(app);
server.installSubscriptionHandlers(httpServer);
httpServer.listen({port: 4000}, () => {
console.log(` Server ready at http://localhost:4000/${server.graphqlPath}`)
console.log(` Subscriptions ready at ws://localhost:4000/${server.subscriptionsPath}`)
});
GraphQL 的 typeDefs
架构如下:
type Result {
data: String
}
type Subscription {
siteAdded(topic: String): Result
}
schema {
query: Query
mutation: Mutation
subscription: Subscription
}
其中 siteAdded(topic: String)
将采用 MQTT 需要订阅的主题。示例:
subscription {
siteAdded(topic: "test/1/env") {
data
}
resolvers.js
如下所示(如 5 月文档中所述):
import { MQTTPubSub } from 'graphql-mqtt-subscriptions';
import { connect } from 'mqtt';
const client = connect('mqtt://my.mqtt.broker.ip.address', {
reconnectPeriod: 1000,
});
const pubsub = new MQTTPubSub({
client
});
export const resolvers: {
Subscription: {
siteAdded: {
subscribe: (_, args) => {
console.log(args.topic); // to check if this gets called or not.
pubsub.asyncIterator([args.topic]);
}
}
}
};
推理
args.topic
上的 console.log
被调用,但之后 graphiql
中出现以下错误:
{
"error": {
"message": "Subscription field must return Async Iterable. Received: undefined"
}
}
如果我执行 return pubsub.asyncIterator()
:
它提供来自 Broker 的及时数据,但输出是 null
:
{
"data": {
"siteAdded": null
}
}
我根据Apollo Docs
在上面提到的server.js
中添加了Websockets中间件
我哪里出错了以及如何将来自订阅主题的数据添加到 graphiql
?
总结
更新
graphql-mqtt-subscriptions
:v1.2.0
现在提供通配符解析支持
注意事项
NPM Registry
graphql-mqtt-subscriptions
v1.1.0 中的+
和#
等通配符不可用。但是,存储库已经有了实现。存储库的所有者需要更新注册表。参见 Open Issue forgraphql-mqtt-subscriptions
我目前正在使用 MQTT 订阅的完整主题,以便从传感器获取数据,例如
test/1/env
相对于test/+/env
开发更新
之前我以原始字符串格式(纯文本)从传感器发送数据,因此我更新了固件以在 JSON 字符串中发送数据,如下所示:
{"data": "temp=23,humid=56 1500394302"}
解决方案
正如@Dom 和@DanielRearden 在评论中提到的,如果我使用大括号
{}
,我最初忘记添加return
。例如:Subscription: { siteAdded: { subscribe: (_, args) => { console.log(args.topic); // to check if this gets called or not. return pubsub.asyncIterator([args.topic]); } } }
或者我只是删除括号和
return
通过编写解析器如下:Subscription: { siteAdded: { subscribe: (_, args) => pubsub.asyncIterator([args.topic]), } }
如查询中所述,这仍在返回我
null
。我能够按照 Apollo 的 Payload Transformation 文档从订阅中获取数据,在我的解析器中我执行了以下操作:
Subscription: { siteAdded: { resolve: (payload) => { return { data: payload.data, }; }, subscribe: (_, args) => pubsub.asyncIterator([args.topic]), } }
必须为架构相应地解析有效负载。
结果
现在按如下方式订阅非常有效:
subscription {
siteAdded(topic: "test/1/env") {
data
}
}
提供以下结果:
{
"data": {
"siteAdded": {
"data": "temp=27.13,humid=43.33 1565345004"
}
}
}