Relay Modern:将 websocket 连接到网络层

Relay Modern: Connecting websocket to network layer

我在弄清楚如何将 Relay Modern 网络层与我的 websocket 实例连接时遇到问题。

我目前正在将一个 websocket 实例实例化为:

const subscriptionWebSocket = new ReconnectingWebSocket('ws://url.url/ws/subscriptions/', null, options);

我正在指定订阅并创建 requestSubscription 的新实例:

const subscription = graphql`
  subscription mainSubscription {
    testData {
      anotherNode {
        data
      }
    }
  }
`;

requestSubscription(
  environment,
  {
    subscription,
    variables: {},
    onComplete: () => {...},
    onError: (error) => {...},
    onNext: (response) => {...},
    updater: (updaterStoreConfig) => {...},
  },
);

然后允许我发送任何订阅请求:

function subscriptionHandler(subscriptionConfig, variables, cacheConfig, observer) {
  subscriptionWebSocket.send(JSON.stringify(subscriptionConfig.text));

  return {
    dispose: () => {
      console.log('subscriptionHandler: Disposing subscription');
    },
  };
}

const network = Network.create(fetchQuery, subscriptionHandler);

通过我的服务器(目前使用 Graphene-python),我能够在服务器上解释接收到的消息。

但是,我遇到的问题是如何回复订阅;例如,当我的数据库发生变化时,我想生成响应并 return 给任何潜在订阅者。

问题是,如何将 onMessage 事件从我的 websocket 实例连接到我的中继现代网络层?我浏览了中继的源代码,但似乎无法弄清楚什么回调,或者什么方法应该实现 onreceive。

如有任何提示,我们将不胜感激。

我认为 this repo 可以满足您的需求。 帮助您在服务器端创建订阅

在此线程中找到帮助后,我将写下我是如何解决这个问题的。它可能对其他人有用。这在很大程度上取决于您选择的服务器端解决方案。

我的做法:

首先,我构建了一个 SubscriptionHandler,它将通过 SubscriptionHandler#setupSubscription 处理 requestStream#subscribeFunction。

SubscriptionHandler 实例化 WebSocket(使用 ReconnectingWebSockets 的自定义版本)并将 onmessage 事件附加到内部方法 (SubscriptionHandler#receiveSubscriptionPayload),该方法将有效负载添加到相应的请求。

我们通过 SubscriptionHandler#newSubscription 创建新的订阅,它将使用内部属性 SubscriptionHandler.subscriptions 添加该订阅的键控条目(我们在查询和变量上使用 MD5 哈希实用程序);表示对象将显示为:

SubscriptionHandler.subscriptions = {
  [md5hash]: {
    query: QueryObject,
    variables: SubscriptionVariables,
    observer: Observer (contains OnNext method)
}

每当服务器发送订阅响应时,都会调用 SubscriptionHandler#receiveSubscriptionPayload 方法,它会使用 query/variables md5 哈希来识别有效负载属于哪个订阅,然后使用 SubscriptionHandler.subscriptions 观察者onNext 方法。

此方法要求服务器 return 一条消息,例如:

export type ServerResponseMessageParsed = {
  payload: QueryPayload,
  request: {
    query: string,
    variables: Object,
  }
}

我不知道这是否是处理订阅的好方法,但它现在适用于我当前的设置。

SubscriptionHandler.js

class SubscriptionHandler {
  subscriptions: Object;
  subscriptionEnvironment: RelayModernEnvironment;
  websocket: Object;

  /**
   * The SubscriptionHandler constructor. Will setup a new websocket and bind
   * it to internal method to handle receving messages from the ws server.
   *
   * @param  {string} websocketUrl      - The WebSocket URL to listen to.
   * @param  {Object} webSocketSettings - The options object.
   *                                      See ReconnectingWebSocket.
   */
  constructor(websocketUrl: string, webSocketSettings: WebSocketSettings) {
    // All subscription hashes and objects will be stored in the
    // this.subscriptions attribute on the subscription handler.
    this.subscriptions = {};

    // Store the given environment internally to be reused when registering new
    // subscriptions. This is required as per the requestRelaySubscription spec
    // (method requestSubscription).
    this.subscriptionEnvironment = null;

    // Create a new WebSocket instance to be able to receive messages on the
    // given URL. Always opt for default protocol for the RWS, second arg.
    this.websocket = new ReconnectingWebSocket(
      websocketUrl,
      null,  // Protocol.
      webSocketSettings,
    );

    // Bind an internal method to handle incoming messages from the websocket.
    this.websocket.onmessage = this.receiveSubscriptionPayload;
  }

  /**
   * Method to attach the Relay Environment to the subscription handler.
   * This is required as the Network needs to be instantiated with the
   * SubscriptionHandler's methods, and the Environment needs the Network Layer.
   *
   * @param  {Object} environment - The apps environment.
   */
  attachEnvironment = (environment: RelayModernEnvironment) => {
    this.subscriptionEnvironment = environment;
  }

  /**
   * Generates a hash from a given query and variable pair. The method
   * used is a recreatable MD5 hash, which is used as a 'key' for the given
   * subscription. Using the MD5 hash we can identify what subscription is valid
   * based on the query/variable given from the server.
   *
   * @param  {string} query     - A string representation of the subscription.
   * @param  {Object} variables - An object containing all variables used
   *                              in the query.
   * @return {string}             The MD5 hash of the query and variables.
   */
  getHash = (query: string, variables: HashVariables) => {
    const queryString = query.replace(/\s+/gm, '');
    const variablesString = JSON.stringify(variables);
    const hash = md5(queryString + variablesString).toString();
    return hash;
  }

  /**
   * Method to be bound to the class websocket instance. The method will be
   * called each time the WebSocket receives a message on the subscribed URL
   * (see this.websocket options).
   *
   * @param  {string} message - The message received from the websocket.
   */
  receiveSubscriptionPayload = (message: ServerResponseMessage) => {
    const response: ServerResponseMessageParsed = JSON.parse(message.data);
    const { query, variables } = response.request;
    const hash = this.getHash(query, variables);

    // Fetch the subscription instance from the subscription handlers stored
    // subscriptions.
    const subscription = this.subscriptions[hash];

    if (subscription) {
      // Execute the onNext method with the received payload after validating
      // that the received hash is currently stored. If a diff occurs, meaning
      // no hash is stored for the received response, ignore the execution.
      subscription.observer.onNext(response.payload);
    } else {
      console.warn(Received payload for unregistered hash: ${hash});
    }
  }

  /**
   * Method to generate new subscriptions that will be bound to the
   * SubscriptionHandler's environment and will be stored internally in the
   * instantiated handler object.
   *
   * @param {string} subscriptionQuery - The query to subscribe to. Needs to
   *                                     be a validated subscription type.
   * @param {Object} variables         - The variables for the passed query.
   * @param {Object} configs           - A subscription configuration. If
   *                                     override is required.
   */
  newSubscription = (
      subscriptionQuery: GraphQLTaggedNode,
      variables: Variables,
      configs: GraphQLSubscriptionConfig,
  ) => {
    const config = configs || DEFAULT_CONFIG;

    requestSubscription(
      this.subscriptionEnvironment,
      {
        subscription: subscriptionQuery,
        variables: {},
        ...config,
      },
    );
  }

  setupSubscription = (
    config: ConcreteBatch,
    variables: Variables,
    cacheConfig: ?CacheConfig,
    observer: Observer,
  ) => {
    const query = config.text;

    // Get the hash from the given subscriptionQuery and variables. Used to
    // identify this specific subscription.
    const hash = this.getHash(query, variables);

    // Store the newly created subscription request internally to be re-used
    // upon message receival or local data updates.
    this.subscriptions[hash] = { query, variables };

    const subscription = this.subscriptions[hash];
    subscription.observer = observer;

    // Temp fix to avoid WS Connection state.
    setTimeout(() => {
      this.websocket.send(JSON.stringify({ query, variables }));
    }, 100);
  }
}

const subscriptionHandler = new SubscriptionHandler(WS_URL, WS_OPTIONS);

export default subscriptionHandler;

我也成功订阅了 Relay Modern 作品,想分享我的最小设置,也许对某些人有帮助!

请注意,我没有使用 WebSocket,而是使用 subscriptions-transport-ws 中的 SubscriptionClient 来管理与服务器的连接。

这是我的最小设置代码:

Environment.js

import { SubscriptionClient } from 'subscriptions-transport-ws'
const {
  Environment,
  Network,
  RecordSource,
  Store,
} = require('relay-runtime')
const store = new Store(new RecordSource())


const fetchQuery = (operation, variables) => {
  return fetch('https://api.graph.cool/relay/v1/__GRAPHCOOL_PROJECT_ID__', {
    method: 'POST',
    headers: {
      'Accept': 'application/json',
      'Content-Type': 'application/json'
    },
    body: JSON.stringify({
      query: operation.text,
      variables,
    }),
  }).then(response => {
    return response.json()
  })
}

const websocketURL = 'wss://subscriptions.graph.cool/v1/__GRAPHCOOL_PROJECT_ID__'

function setupSubscription(
  config,
  variables,
  cacheConfig,
  observer,
) {
  const query = config.text

  const subscriptionClient = new SubscriptionClient(websocketURL, {reconnect: true})
  const id = subscriptionClient.subscribe({query, variables}, (error, result) => {
    observer.onNext({data: result})
  })
}

const network = Network.create(fetchQuery, setupSubscription)
const environment = new Environment({
  network,
  store,
})

export default environment

NewLinkSubscription.js

import {
  graphql,
  requestSubscription
} from 'react-relay'
import environment from '../Environment'

const newLinkSubscription = graphql`
  subscription NewLinkSubscription {
    Link {
      mutation
      node {
        id
        description
        url
        createdAt
        postedBy {
          id
          name
        }
      }
    }
  }
`

export default (onNext, onError, onCompleted, updater) => {

  const subscriptionConfig = {
    subscription: newLinkSubscription,
    variables: {},
    onError,
    onNext,
    onCompleted,
    updater
  }

  requestSubscription(
    environment,
    subscriptionConfig
  )

}

现在您可以简单地使用导出的函数来订阅。例如,在 componentDidMount 中的一个 React 组件中,我现在可以执行以下操作:

componentDidMount() {
  NewLinkSubscription(
    response => console.log(`Received data: `, response),
    error => console.log(`An error occurred:`, error),
    () => console.log(`Completed`)
  )
}

请注意,SubscriptionClient 只能在您的服务器实现 this 协议时使用!

如果您想了解更多信息,请查看完整堆栈 How to GraphQL tutorial,其中详细描述了如何使用 Relay Modern 进行订阅。

对于最近遇到这个问题的任何人来说,由于所涉及的库最近更新,我在上述两种解决方案中都没有成功。然而,它们是一个很好的起点,我根据官方的 relay modern todo 示例整理了一个小示例,它非常简约,使用了 Apollo 的 helpers 库,但与 relay modern 配合得很好:

https://github.com/jeremy-colin/relay-examples-subscription

它包括服务器和客户端

希望能帮到你