在发布订阅模型中将回调转换为 Promise
Convert Callback to Promise in Publish Subscribe Model
假设我们有一个名为 A
的服务,它的功能是 subscribe(callback)
。从服务订阅是一个开放的连接,可以随时接收数据,可以通过回调访问数据。
我们可以将此回调转换为 promise 吗?如果可以,怎么做?
样本
A.subscribe((error, data) => {
// do something with data
});
Can we convert this callback to promise?
不仅仅是一个承诺,不,因为一个承诺只能一次,只有一个履行价值(成功时) ,但您有一个 系列 值(通常称为“可观察值”)。因此,您不能将其转换为 return 承诺,除非您当然希望承诺仅使用 一个 值(例如,第一个)来实现。
您可以将其转换为异步迭代器(也许使用 async
生成器函数)。顾名思义,异步迭代器通过 return 一个 系列 承诺异步提供一系列值。我不能说异步迭代器是否适用于您的用例,但它是我想到的最接近基于承诺的可观察对象。
Here's 为 Angular 的可观察对象设计的可观察对象到异步迭代器的实现,但可以根据需要进行调整。 (遗憾的是,没有指明许可证,所以我无法将其复制到答案中。)
双工流
正如 TJ 所建议的,您不能用 单个 承诺表示订阅。但是,您只需要一个变量和重新创建承诺的能力。我使用了一种类似于 TJ link 中描述的技术,但在抽象方面有显着差异。下面的 duplexStream
为程序中的任何流消费者提供了一个 read
函数,以及一个用于流生产者的 write
函数 -
function duplexStream () {
let t = defer()
async function* read () {
while (true) yield await t.deferred
}
function write (err, value) {
if (err) t.reject(err)
else t.resolve(value)
t = defer()
}
return [read, write]
}
defer
是提供外部控制承诺的简单抽象 -
function defer () {
let resolve, reject
return { deferred: new Promise((res, rej) => (resolve = res, reject = rej)), resolve, reject }
}
假设我们有两个 HTML 元素 -
<p id="foo"></p>
<p id="bar"></p>
频道
duplexStream
是通用的,不依赖于特定的库或 class。我们现在可以使用它将可观察对象转换为异步迭代器。注意 read
可以被多次调用,所有读者都将获得传递给 write
的值。 write
也可以给任意数量的生产者。通过向调用者提供 read
和 write
作为通用函数,可以在普通 publish
和 subscribe
事件期间管理这些处理程序。
在 duplexStream
之上进行抽象,我们使用熟悉的 publish
和 subscribe
方法创建了一个 channel
-
function channel ([read, write] = duplexStream()) {
return {
publish(v) {
if (v instanceof Error)
write(v)
else
write(null, v)
},
async subscribe(onValue, onError = console.error) {
while (true)
try {
for await (const v of read())
onValue(v)
}
catch (err) {
onError(err)
}
}
}
}
给定一些 <form>
-
<form id="myform">
<button type="button" name="b1">▶️ next value</button>
<button type="button" name="b2">⏺ some error</button>
<pre><output name="foo"></output></pre>
<pre><output name="bar"></output></pre>
</form>
我们创建一个频道 A
,并创建两个 <button>
发布者和两个 <output>
订阅者 -
const f = document.forms.myform
const A = channel()
// publishers
f.b1.addEventListener("click", event => A.publish(String(new Date)))
f.b2.addEventListener("click", event => A.publish(Error("error")))
// subscribers
A.subscribe(v => f.foo.value += v + "\n")
A.subscribe(v => f.bar.value += v + "\n")
每次点击 ▶️ 都会 publish
每个 subscribe
r -
一个值
render preview
点击 ⏺ 将 publish
控制台出错,但后续使用 ▶️ 发布仍然有效。
演示
运行 下面的代码片段在您自己的浏览器中验证结果 -
function duplexStream () {
let t = defer()
async function* read () { while (true) yield await t.deferred }
function write (err, value) { if (err) t.reject(err); else t.resolve(value); t = defer() }
return [read, write]
}
function defer () {
let resolve, reject
return { deferred: new Promise((res, rej) => (resolve = res, reject = rej)), resolve, reject }
}
function channel ([read, write] = duplexStream()) {
return {
publish(v) { if (v instanceof Error) write(v); else write(null, v) },
async subscribe(onValue, onError = console.error) {
while (true) try { for await (const v of read()) onValue(v) } catch (err) { onError(err) }
}
}
}
const f = document.forms.myform
const A = channel()
f.b1.addEventListener("click", event => A.publish(String(new Date)))
f.b2.addEventListener("click", event => A.publish(Error("error")))
A.subscribe(v => f.foo.value += v + "\n")
A.subscribe(v => f.bar.value += v + "\n")
pre { margin: 0.5rem 0; }
<form id="myform">
<button type="button" name="b1">▶️ next value</button>
<button type="button" name="b2">⏺ some error</button>
<pre><output name="foo"></output></pre>
<pre><output name="bar"></output></pre>
</form>
假设我们有一个名为 A
的服务,它的功能是 subscribe(callback)
。从服务订阅是一个开放的连接,可以随时接收数据,可以通过回调访问数据。
我们可以将此回调转换为 promise 吗?如果可以,怎么做?
样本
A.subscribe((error, data) => {
// do something with data
});
Can we convert this callback to promise?
不仅仅是一个承诺,不,因为一个承诺只能一次,只有一个履行价值(成功时) ,但您有一个 系列 值(通常称为“可观察值”)。因此,您不能将其转换为 return 承诺,除非您当然希望承诺仅使用 一个 值(例如,第一个)来实现。
您可以将其转换为异步迭代器(也许使用 async
生成器函数)。顾名思义,异步迭代器通过 return 一个 系列 承诺异步提供一系列值。我不能说异步迭代器是否适用于您的用例,但它是我想到的最接近基于承诺的可观察对象。
Here's 为 Angular 的可观察对象设计的可观察对象到异步迭代器的实现,但可以根据需要进行调整。 (遗憾的是,没有指明许可证,所以我无法将其复制到答案中。)
双工流
正如 TJ 所建议的,您不能用 单个 承诺表示订阅。但是,您只需要一个变量和重新创建承诺的能力。我使用了一种类似于 TJ link 中描述的技术,但在抽象方面有显着差异。下面的 duplexStream
为程序中的任何流消费者提供了一个 read
函数,以及一个用于流生产者的 write
函数 -
function duplexStream () {
let t = defer()
async function* read () {
while (true) yield await t.deferred
}
function write (err, value) {
if (err) t.reject(err)
else t.resolve(value)
t = defer()
}
return [read, write]
}
defer
是提供外部控制承诺的简单抽象 -
function defer () {
let resolve, reject
return { deferred: new Promise((res, rej) => (resolve = res, reject = rej)), resolve, reject }
}
假设我们有两个 HTML 元素 -
<p id="foo"></p>
<p id="bar"></p>
频道
duplexStream
是通用的,不依赖于特定的库或 class。我们现在可以使用它将可观察对象转换为异步迭代器。注意 read
可以被多次调用,所有读者都将获得传递给 write
的值。 write
也可以给任意数量的生产者。通过向调用者提供 read
和 write
作为通用函数,可以在普通 publish
和 subscribe
事件期间管理这些处理程序。
在 duplexStream
之上进行抽象,我们使用熟悉的 publish
和 subscribe
方法创建了一个 channel
-
function channel ([read, write] = duplexStream()) {
return {
publish(v) {
if (v instanceof Error)
write(v)
else
write(null, v)
},
async subscribe(onValue, onError = console.error) {
while (true)
try {
for await (const v of read())
onValue(v)
}
catch (err) {
onError(err)
}
}
}
}
给定一些 <form>
-
<form id="myform">
<button type="button" name="b1">▶️ next value</button>
<button type="button" name="b2">⏺ some error</button>
<pre><output name="foo"></output></pre>
<pre><output name="bar"></output></pre>
</form>
我们创建一个频道 A
,并创建两个 <button>
发布者和两个 <output>
订阅者 -
const f = document.forms.myform
const A = channel()
// publishers
f.b1.addEventListener("click", event => A.publish(String(new Date)))
f.b2.addEventListener("click", event => A.publish(Error("error")))
// subscribers
A.subscribe(v => f.foo.value += v + "\n")
A.subscribe(v => f.bar.value += v + "\n")
每次点击 ▶️ 都会 publish
每个 subscribe
r -
render preview |
---|
点击 ⏺ 将 publish
控制台出错,但后续使用 ▶️ 发布仍然有效。
演示
运行 下面的代码片段在您自己的浏览器中验证结果 -
function duplexStream () {
let t = defer()
async function* read () { while (true) yield await t.deferred }
function write (err, value) { if (err) t.reject(err); else t.resolve(value); t = defer() }
return [read, write]
}
function defer () {
let resolve, reject
return { deferred: new Promise((res, rej) => (resolve = res, reject = rej)), resolve, reject }
}
function channel ([read, write] = duplexStream()) {
return {
publish(v) { if (v instanceof Error) write(v); else write(null, v) },
async subscribe(onValue, onError = console.error) {
while (true) try { for await (const v of read()) onValue(v) } catch (err) { onError(err) }
}
}
}
const f = document.forms.myform
const A = channel()
f.b1.addEventListener("click", event => A.publish(String(new Date)))
f.b2.addEventListener("click", event => A.publish(Error("error")))
A.subscribe(v => f.foo.value += v + "\n")
A.subscribe(v => f.bar.value += v + "\n")
pre { margin: 0.5rem 0; }
<form id="myform">
<button type="button" name="b1">▶️ next value</button>
<button type="button" name="b2">⏺ some error</button>
<pre><output name="foo"></output></pre>
<pre><output name="bar"></output></pre>
</form>