使用服务器发送的事件和 EventSource 的 GraphQL 订阅

GraphQL subscription using server-sent events & EventSource

我正在研究使用服务器发送的事件作为支持 api。

来实现 "subscription" 类型

我纠结的是接口,更准确地说,是这种操作的http层。

问题:

使用原生EventSource不支持:

  1. 指定HTTP方法,默认使用"GET"。
  2. 包括有效负载(GraphQL 查询)

虽然#1 是无可辩驳的,但可以使用查询参数来规避#2。

查询参数限制为 ~2000 个字符(可以讨论) 这使得完全依赖它们感觉太脆弱了。

我想到的解决方案是为每个可能的事件创建一个专用端点。

例如:表示各方之间已完成交易的事件的 URI:

/graphql/transaction-status/$ID

将在服务器中转换为此查询:

subscription TransactionStatusSubscription {
    status(id: $ID) {
        ready
    }
}

这种方法的问题是:

  1. 为每个 URI 到 GraphQL 的转换创建一个处理程序。
  2. 部署新版本的服务器
  3. 失去 GraphQL 提供的灵活性 -> 客户端应该控制查询
  4. 跟踪代码库中的所有端点(后端、前端、移动端)

我可能遗漏了更多问题。

您是否可以想到更好的方法? 一种允许使用 EventSource 提供请求负载的更好方法?

GraphQL 中的订阅通常使用 WebSockets 实现,而不是 SSE。 Apollo 和 Relay 都支持使用 subscriptions-transport-ws client-side to listen for events. Apollo Server includes built-in support 进行使用 WebSockets 的订阅。如果您只是想实现订阅,最好使用这些现有解决方案之一。

就是说,有一个库可以利用 SSE 进行订阅 here。它看起来不再维护了,但是如果您一心想让 SSE 工作,您可以浏览源代码以获得一些想法。查看源代码,作者似乎通过使用 POST 请求 returns 订阅 ID 初始化每个订阅来绕过您上面提到的限制。

如果您使用的是 Apollo,它们支持 automatic persisted queries(文档中缩写为 APQ)。如果您不使用 Apollo,那么使用任何语言的实现都不会太差。我建议遵循他们的约定,这样您的客户就可以根据需要使用 Apollo。

任何客户端第一次使用查询的哈希发出 EventSource 请求时,它将失败,然后使用完整的有效负载重试请求到常规 GraphQL 端点。如果在服务器上启用了 APQ,则来自所有带有查询参数的客户端的后续 GET 请求将按计划执行。

一旦你解决了这个问题,你只需要为 GraphQL 做一个服务器发送的事件传输(考虑到 subscribe 函数 returns an AsyncIterator 应该很容易)

我正在考虑在我的公司这样做,因为一些前端开发人员喜欢 EventSource 处理起来多么容易。

这里有两件事在起作用:SSE 连接和 GraphQL 端点。端点有一个规范要遵循,所以仅仅从订阅请求返回 SSE 并没有完成,无论如何都需要一个 GET 请求。所以两者必须分开。

让客户端通过 /graphql-sse 打开一个 SSE 通道怎么样,这会创建一个通道令牌。使用此令牌,客户端随后可以请求订阅,事件将通过所选频道到达。

令牌可以作为 SSE 通道上的第一个事件发送,并将令牌传递给查询,它可以由客户端在 cookie、请求 header 甚至未使用的文件中提供查询变量。

或者,服务器可以将最后打开的频道存储在 session 存储中(将客户端限制为单个频道)。

如果没有找到频道,则查询失败。如果通道关闭,客户端可以再次打开它,并在查询中传递令牌 string/cookie/header 或让 session 存储处理它。

截至目前,您通过 SSE 订阅了多个 GraphQL 包。

graphql-sse

提供客户端和服务器以通过 SSE 使用 GraphQL 订阅。这个包有一个专门的订阅处理程序。

这里是 express 的用法示例。

import express from 'express'; // yarn add express
import { createHandler } from 'graphql-sse';

// Create the GraphQL over SSE handler
const handler = createHandler({ schema });

// Create an express app serving all methods on `/graphql/stream`
const app = express();
app.use('/graphql/stream', handler);

app.listen(4000);
console.log('Listening to port 4000');

@graphql-sse/server

为 GraphQL 订阅提供服务器处理程序。但是,HTTP 处理取决于您使用的框架。

免责声明:我是 @graphql-sse packages

的作者

这是一个使用 express 的例子。

import express, { RequestHandler } from "express";
import {
  getGraphQLParameters,
  processSubscription,
} from "@graphql-sse/server";
import { schema } from "./schema";

const app = express();

app.use(express.json());

app.post(path, async (req, res, next) => {
    const request = {
        body: req.body,
        headers: req.headers,
        method: req.method,
        query: req.query,
    };

    const { operationName, query, variables } = getGraphQLParameters(request);
    if (!query) {
        return next();
    }
    const result = await processSubscription({
        operationName,
        query,
        variables,
        request: req,
        schema,
    });

    if (result.type === RESULT_TYPE.NOT_SUBSCRIPTION) {
        return next();
    } else if (result.type === RESULT_TYPE.ERROR) {
        result.headers.forEach(({ name, value }) => res.setHeader(name, value));
        res.status(result.status);
        res.json(result.payload);
    } else if (result.type === RESULT_TYPE.EVENT_STREAM) {
        res.writeHead(200, {
            'Content-Type': 'text/event-stream',
            Connection: 'keep-alive',
            'Cache-Control': 'no-cache',
        });

        result.subscribe((data) => {
            res.write(`data: ${JSON.stringify(data)}\n\n`);
        });

        req.on('close', () => {
            result.unsubscribe();
        });
    }
});

客户

上面提到的两个包都有配套的客户端。由于 EventSource API 的限制,这两个包都实现了一个自定义客户端,该客户端提供用于发送 HTTP Headers 的选项、带有 post 的负载、EvenSource 是什么API不支持。 graphql-sse 与其客户端一起提供,而 @graphql-sse/server 在单独的包中有配套客户端。

graphql-sse 客户端示例

import { createClient } from 'graphql-sse';

const client = createClient({
  // singleConnection: true, use "single connection mode" instead of the default "distinct connection mode"
  url: 'http://localhost:4000/graphql/stream',
});

// query

  const result = await new Promise((resolve, reject) => {
    let result;
    client.subscribe(
      {
        query: '{ hello }',
      },
      {
        next: (data) => (result = data),
        error: reject,
        complete: () => resolve(result),
      },
    );
  });


 // subscription

  const onNext = () => {
    /* handle incoming values */
  };

  let unsubscribe = () => {
    /* complete the subscription */
  };

  await new Promise((resolve, reject) => {
    unsubscribe = client.subscribe(
      {
        query: 'subscription { greetings }',
      },
      {
        next: onNext,
        error: reject,
        complete: resolve,
      },
    );
  });

;

@graphql-sse/client

@graphql-sse/server 的同伴。

例子

import {
  SubscriptionClient,
  SubscriptionClientOptions,
} from '@graphql-sse/client';

const subscriptionClient = SubscriptionClient.create({
    graphQlSubscriptionUrl: 'http://some.host/graphl/subscriptions'
});

const subscription = subscriptionClient.subscribe(
    {
        query: 'subscription { greetings }',
    }
)

const onNext = () => {
    /* handle incoming values */
  };

const onError = () => {
    /* handle incoming errors */
  };

subscription.susbscribe(onNext, onError)

@gaphql-sse/apollo-client

Apollo Client @graph-sse/server 软件包的配套软件包。

import { split, HttpLink, ApolloClient, InMemoryCache } from '@apollo/client';
import { getMainDefinition } from '@apollo/client/utilities';
import { ServerSentEventsLink } from '@graphql-sse/apollo-client';

const httpLink = new HttpLink({
  uri: 'http://localhost:4000/graphql',
});

const sseLink = new ServerSentEventsLink({
  graphQlSubscriptionUrl: 'http://localhost:4000/graphql',
});

const splitLink = split(
  ({ query }) => {
    const definition = getMainDefinition(query);
    return (
      definition.kind === 'OperationDefinition' &&
      definition.operation === 'subscription'
    );
  },
  sseLink,
  httpLink
);

export const client = new ApolloClient({
  link: splitLink,
  cache: new InMemoryCache(),
});