我如何创建一个像 defer 使用 Observable.create 返回的可观察对象?

How can I create an observable like the one returned by defer using Observable.create?

我正在尝试创建一个行为类似于 defer 返回的行为的可观察对象,但使用的是 create 方法。所以我尝试:

const obs = Observable.create(function(observer) {
   from(fetch('https://jsonplaceholder.typicode.com/todos/1').then(console.log('fetch done'))).subscribe(observer)
})

setTimeout(()=>obs.subscribe((resp)=>console.log(resp.statusText)), 5000)

但是当我 运行 它在 node 它只是打印 "fetch done" (在订阅时,如预期的 5 秒后)但永远在那里等待。

当我将 from(..) 包裹在 setImmediate()setTimeout(,0) 中时,有时它会打印两条消息("Fetch done"、"OK")并退出和 有时它只打印"fetch done"并永远等待。

代码:

const obs = Observable.create(function(observer) {
    setTimeout(()=>from(fetch('https://jsonplaceholder.typicode.com/todos/1').then(console.log('fetch done'))).subscribe(observer), 0)
})

setTimeout(()=>obs.subscribe((resp)=>console.log(resp.statusText)), 5000)

为什么会这样?我做错了什么?

首先是 Observable.create 已弃用,您应该只使用 new Observable():

当像这样创建 Observables 时,您可以访问 observer 对象,您可以在其中调用 next()complete() 所以在您的情况下它会像这样:

new Observable(observer => {
  fetch('https://jsonplaceholder.typicode.com/todos/1').then(response => {
    observer.next(response);
    observer.complete();
  });
});

我认为这也应该有效:

new Observable(observer => {
  const sub =  from(fetch('https://jsonplaceholder.typicode.com/todos/1')).subscribe(observer);
  // Return tear-down function so you can abort request.
  return () => sub.unsubscribe();
});

显然,这太复杂了,如果你只想用 Observable 包装一个 Promise,你可以只使用 from().

Promise.then 接受一个回调函数,你需要 return 参数来链接它,所以它变成 then(result => { console.log('fetch done', result); return result; })