在 GraphQL 参考实现中,解析器应该 return 一个 Iterable,如何 return 一个异步 Iterable?

In the GraphQL reference implementation resolvers are expected to return an Iterable, how to return an asynchronous Iterable?

我正在使用 Sequelize to access my relational database and deliver the results in a GraphQL resolver. Queries within the Sequelize framework are executed asynchronously (bluebird)。为了 缓冲大的结果集 并避免 高内存需求 在服务器上,例如请求了数百万条记录,我想在我的解析器中返回一个迭代器。考虑这个简化的要点:

// root resolver
function allPersons(...) {
  [...]
  return {
    nextId: 1,
    maxId: 10000000, 
    [Symbol.iterator]: () => { return this },
    next: function() {
      let nextRes = { done: true, value: null }
      if (this.nextId <= this.maxId) {
        nextRes.value = sequelize.models.person.findById(this.currId)
        nextRes.done = false
        this.nextId = this.nextId + 1
      }
      return nextRes
    }
}

以上有效,因为 Sequelize 构造的 Promise 返回为 next()value。当 this value-Promise 被 resolved 时,它会从底层关系数据库中获取单个记录。于是,我同步构造了异步数据抓取。这只有效,因为每个单独的提取都独立于所有其他提取。特别是在执行下一个之前,不需要 awaited 单个提取。然而,逐行获取关系数据库在技术上是低效的,实际上是一种反模式。因此,我想实现一个缓冲区,它可以获取 10k 行的批次,为它们提供服务直到该批次为空,然后获取下一个。但是,由于当时引入了异步事件的依赖性,要实现这一点,就需要一个异步迭代器 (Symbol.asyncIterator).

我需要做什么才能使 GraphQL's reference implementation (graphql-js and/or express-graphql) 接受异步迭代器? 请注意,我想避免使用 Apollo GraphQL.

或者 Object-Stream 是一个可能的解决方案吗?

将不胜感激。

GraphQL.js 在后台使用 iterall。为了支持异步迭代,底层代码必须使用该库中的 forAwaitEach 方法,而不是现在使用的 forEach 方法。这可能是可行的,但我不确定它是否会破坏其他功能。

如果您只想获取一些任意大小的块中的所有 people,您不需要做任何特别花哨的事情:

async function getAllPeople () {
  const chunkSize = 10000
  const startId = 1
  const endId = await sequelize.models.person.max('id')
  const people = []

  let lower = startId
  let upper = startId + chunkSize

  while (upper < (endId + 1)) {
    const chunk = await sequelize.models.person.findAll({
      where: {
        id: {
          [Op.and]: {
            [Op.gte]: lower,
            [Op.lt]: upper,
          }
        }
      },
    })
    people.push(chunk)
    lower = lower + chunkSize
    upper = upper + chunkSize
  }

  return people
}

编辑: 要解决内存问题,您必须有效地将有效负载分解为多个响应,并有办法在客户端将它们放回一起。在 Apollo 的路线图上有一个 @stream 指令可以做到这一点,我认为有些人已经尝试过它,但我想我们可能还需要一段时间才能看到它的成熟实现。 @defer 有类似的机制,目前由 Apollo 支持,但在解析器级别工作,因此在这种情况下它不会真正有用。

您可以使用 subscriptions 破解它,顺便说一句,它确实使用了异步迭代器。您仍然可能需要使用查询或突变来触发发送数据,但随后可以通过订阅将其发送到客户端。

不幸的是,我认为给定当前工具的最简单解决方案是仅对查询实施分页并让客户端拼凑出总结果。

半个解决方案:使用流并将它们转换为同步迭代器

因为 GraphQL 解析器需要 return 同步迭代器,所以可以使用流将它们的数据馈送到这样的迭代器中。考虑问题中发布的原始示例的以下解决方案。请注意,这里使用了流行的 ORM Sequelize does not support streams and thus another node package knex

// Setup:
const knex = require('knex')
var dbCon = knex({
  client: 'pg',
  connection: {} // Define host, user, password, db (see knex docu)
})

// Get records as stream
var peopleStream = dbCon.select('*').from('people').stream()

// Serve stream within an synchronous iterator
var iter = {
  [Symbol.iterator]: () => {
    return this
  },
  next: function() {
    let v = peopleStream.read() || null
    console.log(JSON.stringify(v)) // Check, if it works.
    return {
      done: v === null,
      value: v
    }
  }
} 

然而,这确实只是解决方案的一半,因为只能以所示方式利用数据源来生成流 - 反过来又可以很容易地转换为同步迭代器如此处所示。在我看来,GraphQL 的参考实现迫切需要支持异步迭代器作为解析器的结果值。有关详细信息,请参阅 this feature request