从 RxJS4 迁移到 RxJS5 - 实现观察者

Migrating from RxJS4 to RxJS5 - implementing an Observer

在我的一个项目中,我有以下代码,在迁移到 RxJS5 后,Rx.Observer 似乎不再被定义:

let index = 0;

let obsEnqueue = this.obsEnqueue = new Rx.Subject();

this.queueStream = Rx.Observable.create(obs => {
    var push = Rx.Observer.create(v => {             // ! error
        if ((index % obsEnqueue.observers.length) === obsEnqueue.observers.indexOf(push)) {
            obs.next(v);
        }
    });
    return obsEnqueue.subscribe(push);
});

this.push = (v) => {
    obsEnqueue.next(v);
    index++;
};

这不再有效,因为 Rx.Observer 未定义

在迁移指南中:

https://github.com/ReactiveX/rxjs/blob/master/MIGRATION.md

它说:

Observer is an interface now

然而,这不应该意味着 Rx.Observer,即使它是一个接口,也不应该有一个 "static" 方法,称为 create。

总之,Rx.Observer好像已经不存在了。 我收到此错误:

TypeError: Cannot read property 'create' of undefined

我如何创建一个 Observer 以某种方式产生与我上面的代码类似的结果?

为什么不直接将 onNext 函数内联到订阅中?

this.queueStream = Rx.Observable.create(obs => {
    return obsEnqueue.subscribe(
      v => {
        if ((index % obsEnqueue.observers.length) === obsEnqueue.observers.indexOf(push)) {
            obs.next(v);
        }
      }
    );
});

老实说,我不明白你的代码是做什么的,但即使 Observer class 不再存在,它主要被 Subscriber [=23= 取代了] 的使用方式几乎与 Observer.

相同

它有一个静态的Subscriber.create方法。参见 https://github.com/ReactiveX/rxjs/blob/master/src/Subscriber.ts#L32

此方法 returns 一个 Subscriber 稍后可以使用的对象,例如 obsEnqueue.subscribe(push);.

来源:

export interface Observer<T> {
  closed?: boolean;
  next: (value: T) => void;
  error: (err: any) => void;
  complete: () => void;
}

Observer<T> 是与 onNextonCompletedonError 方法的接口。接口只是一种语言结构。打字稿编译器仅使用它来对需要 Observer<T> 的对象进行类型检查。它在编译时被删除。

class Subscriber<T> 实现接口 Observer<T>。 这意味着 Subscriber<T> 是上述方法的实际具体 class。

所以你改用var push = Rx.Subscriber.create(v => { [...]

注:

在最初的 Rx 实现中,接口是 IObservable<T>IObserver<T> 并使用扩展方法来允许组合。当谈到 JS 时,他们必须在 Observable / Observer 本身的原型上有方法来启用组合——所以 class 本身有方法。

如果我理解这部分代码试图做什么...
我认为不,
我不明白你是怎么做到的 "simpler".
也许有什么需要改进的地方
就是让它更 "reusable",
使它成为一个模块 ?,
或者如果还没有类似的 Rx 运算符...

这可能是一次尝试

/*
    "dependencies": {
        "rxjs": "^5.0.2"
    }
*/

import {Observable, Observer, Subject, Subscriber} from "rxjs";

export interface ICircularQueue<T> extends Observable<T> {
    push(value: T): void;
}

/**
* on every push use 'NEXT' observer/subscription,
* in the order they've been subscribed,
* cycling back to 1st subscription after last
*/
export function create<T>(): ICircularQueue<T> {

    let index = 0;

    let obsEnqueue = new Subject<T>();

    let queueStream = Observable.create((obs: Observer<T>) => {

        let push = Subscriber.create<T>(v => {
            // ! error ?
            if ((index % obsEnqueue.observers.length) === obsEnqueue.observers.indexOf(push)) {
                obs.next(v);
            }
        });

        return obsEnqueue.subscribe(push);
    });

    queueStream.push = (v: T) => {
        obsEnqueue.next(v);
        index++;
    };

    return queueStream;
}

然后是基础测试....

import * as CircularQueue from "./CircularQueue";
import * as assert from "assert";

const $in = (array: any[], x: any) => {
    for (let $x of array) {
        if ($x === x) { return true; }
    }
    return false;
};

describe("CircularQueue", () => {

    it("works", () => {
        let queue = CircularQueue.create();

        let result: number[] = [];
        queue.subscribe(x => {
            assert.ok($in([0, 4, 8], x));
            result.push(0);
        });
        queue.subscribe(x => {
            assert.ok($in([1, 5, 9], x));
            result.push(1);
        });
        queue.subscribe(x => {
            assert.ok($in([2, 6, 10], x));
            result.push(2);
        });
        queue.subscribe(x => {
            assert.ok($in([3, 7, 11], x));
            result.push(3);
        });

        for (let i = 0; i < 12; i++) {
            queue.push(i);
        }

        assert.equal(result.join(), "0,1,2,3,0,1,2,3,0,1,2,3");
    });
});