如何正确使用 GraphQL 订阅?

How to use GraphQL subscription correctly?

我有一个 GraphQL 支持的应用程序。查询和变异部分运行良好。我尝试添加 GraphQL 订阅。

服务端GraphQL订阅部分代码灵感来自apollographql/subscriptions-transport-ws.

readme中的demo

另请查看代码中的注释以了解更多详细信息。

import Koa from 'koa';
import Router from 'koa-router';
import graphqlHTTP from 'koa-graphql';
import asyncify from 'callback-to-async-iterator';
import { SubscriptionServer } from 'subscriptions-transport-ws';
import firebase from 'firebase-admin';
import { execute, subscribe } from 'graphql';
import { GraphQLObjectType, GraphQLString } from 'graphql';

const MeType = new GraphQLObjectType({
  name: 'Me',
  fields: () => ({
    name: { type: GraphQLString },
    // ...
  }),
});

const listenMe = async (callback) => {
  // Below the firebase API returns real-time data
  return firebase
    .database()
    .ref('/users/123')
    .on('value', (snapshot) => {
      // snapshot.val() returns an Object including name field.
      // Here I tested is correct, it always returns { name: 'Rose', ... }
      // when some other fields inside got updated in database.
      return callback(snapshot.val());
    });
};

const Subscription = new GraphQLObjectType({
  name: 'Subscription',
  fields: () => ({
    meChanged: {
      type: MeType,
      subscribe: () => asyncify(listenMe),
    },
  }),
});

const schema = new GraphQLSchema({
  query: Query,
  mutation: Mutation,
  subscription: Subscription,
});

const app = new Koa();
app
  .use(new Router()
    .post('/graphql', async (ctx) => {
      // ...

      await graphqlHTTP({
        schema,
        graphiql: true,
      })(ctx);
    })
    .routes());

const server = app.listen(3009);

SubscriptionServer.create(
  {
    schema,
    execute,
    subscribe,
  },
  {
    server,
    path: '/subscriptions',
  },
);

我正在使用 Altair GraphQL Client 进行测试,因为它支持 GraphQL 订阅。

如截图所示,每次数据库中的数据发生变化时,它都会获取新数据。

然而,meChangednull,它不会抛出任何错误。任何的想法?谢谢

终于有一个新的库可以在没有完整的 Apollo 框架的情况下完成工作。

https://github.com/enisdenjo/graphql-ws

以下是我成功的代码:

服务器(GraphQL 模式定义语言)

import { useServer } from 'graphql-ws/lib/use/ws';
import WebSocket from 'ws';
import { buildSchema } from 'graphql';

const schema = buildSchema(`
  type Subscription {
    greeting: String
  }
`);

const roots = {
  subscription: {
    greeting: async function* sayHiIn5Languages() {
      for (const hi of ['Hi', 'Bonjour', 'Hola', 'Ciao', 'Zdravo']) {
        yield { greeting: hi };
      }
    },
  },
};

const wsServer = new ws.Server({
  server, // Your HTTP server
  path: '/graphql',
});
useServer(
  {
    schema,
    execute,
    subscribe,
    roots,
  },
  wsServer
);

服务器(GraphQL.js GraphQLSchema 对象方式)

import { execute, subscribe, GraphQLObjectType, GraphQLSchema, GraphQLString } from 'graphql';
import { useServer } from 'graphql-ws/lib/use/ws';
import WebSocket from 'ws';
import { PubSub } from 'graphql-subscriptions';

const pubsub = new PubSub();

const subscription = new GraphQLObjectType({
  name: 'Subscription',
  fields: {
    greeting: {
      type: GraphQLString,
      resolve: (source) => {
        if (source instanceof Error) {
          throw source;
        }
        return source.greeting;
      },
      subscribe: () => {
        return pubsub.asyncIterator('greeting');
      },
    },
  },
});

const schema = new GraphQLSchema({
  query,
  mutation,
  subscription,
});

setInterval(() => {
  pubsub.publish('greeting', {
    greeting: 'Bonjour',
  });
}, 1000);

const wsServer = new ws.Server({
  server, // Your HTTP server
  path: '/graphql',
});
useServer(
  {
    schema,
    execute,
    subscribe,
    roots,
  },
  wsServer
);

客户端

import { createClient } from 'graphql-ws';

const client = createClient({
  url: 'wss://localhost:5000/graphql',
});

client.subscribe(
  {
    query: 'subscription { greeting }',
  },
  {
    next: (data) => {
      console.log('data', data);
    },
    error: (error) => {
      console.error('error', error);
    },
    complete: () => {
      console.log('no more greetings');
    },
  }
);

披露:我与图书馆无关。