Apollo GraphQL: MQTT 订阅一个代理只提供发布的数据

Apollo GraphQL: MQTT Subscribe to a Broker to just provide the published data

场景

我有一个传感器节点,它发布有关特定 MQTT 主题的信息(发送到 Mosquitto 代理)。发送的数据是纯字符串。

后端

目前我正在使用 apollo-server-express 构建 GraphQL 服务器。我希望使用 `graphql-mqtt-subscriptions 来:

dependencies

"dependencies": {
    "apollo-server-express": "^2.8.1",
    "express": "^4.17.1",
    "graphql": "^14.4.2",
    "graphql-mqtt-subscriptions": "^1.1.0",
    "graphql-subscriptions": "^1.1.0",
    "graphql-tools": "^4.0.5",
    "mqtt": "^3.0.0",
    "subscriptions-transport-ws": "^0.9.16"
  },

代码片段

入口点server.js代码:

import express from 'express';
import {ApolloServer } from 'apollo-server-express';
import { typeDefs } from './graphql/schema';
import { resolvers } from './graphql/resolvers';
import { createServer } from 'http';


const server = new ApolloServer({ typeDefs, resolvers});

const app = express();

server.applyMiddleware({ app });

const httpServer = createServer(app);

server.installSubscriptionHandlers(httpServer);

httpServer.listen({port: 4000}, () => {
    console.log(` Server ready at http://localhost:4000/${server.graphqlPath}`)
    console.log(` Subscriptions ready at ws://localhost:4000/${server.subscriptionsPath}`)
});

GraphQL 的 typeDefs 架构如下:


type Result {
        data: String
}

type Subscription {
        siteAdded(topic: String): Result
    }

schema {
  query: Query
  mutation: Mutation
  subscription: Subscription
}

其中 siteAdded(topic: String) 将采用 MQTT 需要订阅的主题。示例:

subscription {
   siteAdded(topic: "test/1/env") {
       data
   }

resolvers.js 如下所示(如 5 月文档中所述):

import { MQTTPubSub } from 'graphql-mqtt-subscriptions';
import { connect } from 'mqtt';

const client = connect('mqtt://my.mqtt.broker.ip.address', {
    reconnectPeriod: 1000,
});

const pubsub = new MQTTPubSub({
    client
});

export const resolvers: {
Subscription: {
        siteAdded: {
            subscribe: (_, args) => {
                console.log(args.topic); // to check if this gets called or not.
                pubsub.asyncIterator([args.topic]);

            }
        }
    }
};

推理

args.topic 上的 console.log 被调用,但之后 graphiql 中出现以下错误:

{
  "error": {
    "message": "Subscription field must return Async Iterable. Received: undefined"
  }
}

如果我执行 return pubsub.asyncIterator():

它提供来自 Broker 的及时数据,但输出是 null:

{
  "data": {
    "siteAdded": null
  }
}

我根据Apollo Docs

在上面提到的server.js中添加了Websockets中间件

我哪里出错了以及如何将来自订阅主题的数据添加到 graphiql

总结

更新

注意事项

  • NPM Registry graphql-mqtt-subscriptions v1.1.0 中的 +# 等通配符不可用。但是,存储库已经有了实现。存储库的所有者需要更新注册表。参见 Open Issue for graphql-mqtt-subscriptions

  • 我目前正在使用 MQTT 订阅的完整主题,以便从传感器获取数据,例如test/1/env 相对于 test/+/env

开发更新

  • 之前我以原始字符串格式(纯文本)从传感器发送数据,因此我更新了固件以在 JSON 字符串中发送数据,如下所示:

      {"data": "temp=23,humid=56 1500394302"}
    

解决方案

  1. 正如@Dom 和@DanielRearden 在评论中提到的,如果我使用大括号 {},我最初忘记添加 return。例如:

        Subscription: {
        siteAdded: {
            subscribe: (_, args) => {
                console.log(args.topic); // to check if this gets called or not.
                return pubsub.asyncIterator([args.topic]);
    
            }
        }
    }
    

    或者我只是删除括号和 return 通过编写解析器如下:

        Subscription: {
           siteAdded: {
               subscribe: (_, args) => pubsub.asyncIterator([args.topic]),
           }
        }
    

    如查询中所述,这仍在返回我 null

  2. 我能够按照 Apollo 的 Payload Transformation 文档从订阅中获取数据,在我的解析器中我执行了以下操作:

       Subscription: {
        siteAdded: {
            resolve: (payload) => {
                return {
                    data: payload.data,
                };
            },
            subscribe: (_, args) => pubsub.asyncIterator([args.topic]),
        }
    }
    

    必须为架构相应地解析有效负载。

结果

现在按如下方式订阅非常有效:

    subscription {
        siteAdded(topic: "test/1/env") {
            data
        }
    }

提供以下结果:

{
  "data": {
    "siteAdded": {
      "data": "temp=27.13,humid=43.33 1565345004"
    }
  }
}