在发布订阅模型中将回调转换为 Promise

Convert Callback to Promise in Publish Subscribe Model

假设我们有一个名为 A 的服务,它的功能是 subscribe(callback)。从服务订阅是一个开放的连接,可以随时接收数据,可以通过回调访问数据。 我们可以将此回调转换为 promise 吗?如果可以,怎么做?

样本

A.subscribe((error, data) => {
    // do something with data
});

Can we convert this callback to promise?

不仅仅是一个承诺,不,因为一个承诺只能一次,只有一个履行价值(成功时) ,但您有一个 系列 值(通常称为“可观察值”)。因此,您不能将其转换为 return 承诺,除非您当然希望承诺仅使用 一个 值(例如,第一个)来实现。

您可以将其转换为异步迭代器(也许使用 async 生成器函数)。顾名思义,异步迭代器通过 return 一个 系列 承诺异步提供一系列值。我不能说异步迭代器是否适用于您的用例,但它是我想到的最接近基于承诺的可观察对象。

Here's 为 Angular 的可观察对象设计的可观察对象到异步迭代器的实现,但可以根据需要进行调整。 (遗憾的是,没有指明许可证,所以我无法将其复制到答案中。)

双工流

正如 TJ 所建议的,您不能用 单个 承诺表示订阅。但是,您只需要一个变量和重新创建承诺的能力。我使用了一种类似于 TJ link 中描述的技术,但在抽象方面有显着差异。下面的 duplexStream 为程序中的任何流消费者提供了一个 read 函数,以及一个用于流生产者的 write 函数 -

function duplexStream () {
  let t = defer()
  async function* read () {
    while (true) yield await t.deferred
  }
  function write (err, value) {
    if (err) t.reject(err)
    else t.resolve(value)
    t = defer()
  }
  return [read, write]
}

defer 是提供外部控制承诺的简单抽象 -

function defer () {
  let resolve, reject
  return { deferred: new Promise((res, rej) => (resolve = res, reject = rej)), resolve, reject }
}

假设我们有两个 HTML 元素 -

<p id="foo"></p>
<p id="bar"></p>

频道

duplexStream 是通用的,不依赖于特定的库或 class。我们现在可以使用它将可观察对象转换为异步迭代器。注意 read 可以被多次调用,所有读者都将获得传递给 write 的值。 write 也可以给任意数量的生产者。通过向调用者提供 readwrite 作为通用函数,可以在普通 publishsubscribe 事件期间管理这些处理程序。

duplexStream 之上进行抽象,我们使用熟悉的 publishsubscribe 方法创建了一个 channel -

function channel ([read, write] = duplexStream()) {
  return {
    publish(v) {
      if (v instanceof Error)
        write(v)
      else
        write(null, v)
    },
    async subscribe(onValue, onError = console.error) {
      while (true)
        try {
          for await (const v of read())
            onValue(v)
        }
        catch (err) {
          onError(err)
        }
    }
  }
}

给定一些 <form> -

<form id="myform">
  <button type="button" name="b1">▶️ next value</button>
  <button type="button" name="b2">⏺ some error</button>
  <pre><output name="foo"></output></pre>
  <pre><output name="bar"></output></pre>
</form>

我们创建一个频道 A,并创建两个 <button> 发布者和两个 <output> 订阅者 -

const f = document.forms.myform
const A = channel()

// publishers
f.b1.addEventListener("click", event => A.publish(String(new Date)))
f.b2.addEventListener("click", event => A.publish(Error("error")))

// subscribers
A.subscribe(v => f.foo.value += v + "\n")
A.subscribe(v => f.bar.value += v + "\n")

每次点击 ▶️ 都会 publish 每个 subscriber -

一个值
render preview

点击 ⏺ 将 publish 控制台出错,但后续使用 ▶️ 发布仍然有效。

演示

运行 下面的代码片段在您自己的浏览器中验证结果 -

function duplexStream () {
  let t = defer()
  async function* read () { while (true) yield await t.deferred }
  function write (err, value) { if (err) t.reject(err); else t.resolve(value); t = defer() }
  return [read, write]
}

function defer () {
  let resolve, reject
  return { deferred: new Promise((res, rej) => (resolve = res, reject = rej)), resolve, reject }
}

function channel ([read, write] = duplexStream()) {
  return {
    publish(v) { if (v instanceof Error) write(v); else write(null, v) },
    async subscribe(onValue, onError = console.error) {
      while (true) try { for await (const v of read()) onValue(v) } catch (err) { onError(err) }
    }
  }
}

const f = document.forms.myform
const A = channel()
f.b1.addEventListener("click", event => A.publish(String(new Date)))
f.b2.addEventListener("click", event => A.publish(Error("error")))
A.subscribe(v => f.foo.value += v + "\n")
A.subscribe(v => f.bar.value += v + "\n")
pre { margin: 0.5rem 0; }
<form id="myform">
  <button type="button" name="b1">▶️ next value</button>
  <button type="button" name="b2">⏺ some error</button>
  <pre><output name="foo"></output></pre>
  <pre><output name="bar"></output></pre>
</form>