Redux Observable / RxJS:如何创建自定义 observable?

Redux Observable / RxJS: How to create custom observable?

我正在尝试在 redux-observable 史诗中进行 websocket 设置,并且我将采用类似于此人的方法:https://github.com/MichalZalecki/connect-rxjs-to-react/issues/1

然而,我第一次尝试连接起来似乎没有用,尽管它看起来和上面的人一样:

import 'rxjs';
import Observable from 'rxjs';

import * as scheduleActions from '../ducks/schedule';

export default function connectSocket(action$, store) {
  return action$.ofType(scheduleActions.CANCEL_RSVP)
    .map(action => {
      new Observable(observer => {
        // do websocket stuff here
        observer.next('message text');
      });
    })
    .map(text => {
      console.log("xxxxxxxxxxxxx: ", text);
      return scheduleActions.rsvpCancelled(1);
    });
};

但是,我收到 Object is not a constructor 错误:

===更新===

看来解构 { Observable } 导出的建议奏效了!

不是唯一的问题是 text 似乎没有过渡到下一个方法...

import 'rxjs';
import { Observable } from 'rxjs';

import * as scheduleActions from '../ducks/schedule';

export default function connectSocket(action$, store) {
  return action$.ofType(scheduleActions.CANCEL_RSVP)
    .map(action => {
      new Observable(observer => {
        // do websocket stuff here
        observer.next('message text');
      });
    })
    .map(text => {
      console.log("xxxxxxxxxxxxx: ", text); // prints undefined
      return scheduleActions.rsvpCancelled(1);
    });
};

在 RxJS v5 中,Observable class 可用作命名导出,而不是默认导出。

import { Observable } from 'rxjs';

从常规 rxjs 导入也将导入 RxJS 的 all(将所有运算符添加到 Observable 原型)。这被描述为 in the docs here。如果您希望更明确并且只导入 Observable 本身,您可以直接在 rxjs/Observable:

导入它
import { Observable } from 'rxjs/Observable';

另外,您在映射自定义 Observable 的方式上遇到了一些问题。

第一期

你实际上并没有 returning 它。呵呵。您缺少 return 语句(或者您可以删除大括号并使用隐式箭头函数 returns)。

第二期

当您 return 一个 Observable 时,常规 .map() 运算符不会做任何特殊的事情。如果您希望自定义 Observable 被订阅和展平,您需要使用一个运算符来进行某种展平。

最常见的两个是 mergeMap(又名 flatMap)或 switchMap

action$.ofType(scheduleActions.CANCEL_RSVP)
  .mergeMap(action => {
    return new Observable(observer => {
      // do websocket stuff here
      observer.next('message text');
    });
  })

您需要哪个运算符取决于您想要的行为。如果您还不熟悉,可以查看 documentation on the various operators or jump straight to the mergeMap and switchMap 文档。


如果您喜欢冒险,RxJS v5 确实提供开箱即用的 WebSocket 支持,您可以尝试使用 Observable.webSocket()。它没有很好地记录,但您也可以查看单元测试,对于简单的只读单向流,它很容易解释——提供 URL 并订阅。它实际上 令人难以置信 强大,如果你能弄清楚如何使用它,那就是。通过单个套接字支持双向、多路复用也就是复杂的多 input/output 通道。我们在 Netflix 将它用于几个具有数千 rps 的内部工具。

你可以看看Demo。访问 Create Custom Observable