从 RxJS4 迁移到 RxJS5 - 实现观察者
Migrating from RxJS4 to RxJS5 - implementing an Observer
在我的一个项目中,我有以下代码,在迁移到 RxJS5 后,Rx.Observer 似乎不再被定义:
let index = 0;
let obsEnqueue = this.obsEnqueue = new Rx.Subject();
this.queueStream = Rx.Observable.create(obs => {
var push = Rx.Observer.create(v => { // ! error
if ((index % obsEnqueue.observers.length) === obsEnqueue.observers.indexOf(push)) {
obs.next(v);
}
});
return obsEnqueue.subscribe(push);
});
this.push = (v) => {
obsEnqueue.next(v);
index++;
};
这不再有效,因为 Rx.Observer
未定义
在迁移指南中:
https://github.com/ReactiveX/rxjs/blob/master/MIGRATION.md
它说:
Observer is an interface now
然而,这不应该意味着 Rx.Observer,即使它是一个接口,也不应该有一个 "static" 方法,称为 create。
总之,Rx.Observer
好像已经不存在了。
我收到此错误:
TypeError: Cannot read property 'create' of undefined
我如何创建一个 Observer 以某种方式产生与我上面的代码类似的结果?
为什么不直接将 onNext 函数内联到订阅中?
this.queueStream = Rx.Observable.create(obs => {
return obsEnqueue.subscribe(
v => {
if ((index % obsEnqueue.observers.length) === obsEnqueue.observers.indexOf(push)) {
obs.next(v);
}
}
);
});
老实说,我不明白你的代码是做什么的,但即使 Observer
class 不再存在,它主要被 Subscriber
[=23= 取代了] 的使用方式几乎与 Observer
.
相同
它有一个静态的Subscriber.create
方法。参见 https://github.com/ReactiveX/rxjs/blob/master/src/Subscriber.ts#L32
此方法 returns 一个 Subscriber
稍后可以使用的对象,例如 obsEnqueue.subscribe(push);
.
来源:
export interface Observer<T> {
closed?: boolean;
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}
Observer<T>
是与 onNext
、onCompleted
和 onError
方法的接口。接口只是一种语言结构。打字稿编译器仅使用它来对需要 Observer<T>
的对象进行类型检查。它在编译时被删除。
class Subscriber<T>
实现接口 Observer<T>
。
这意味着 Subscriber<T>
是上述方法的实际具体 class。
所以你改用var push = Rx.Subscriber.create(v => { [...]
。
注:
在最初的 Rx 实现中,接口是 IObservable<T>
和 IObserver<T>
并使用扩展方法来允许组合。当谈到 JS 时,他们必须在 Observable
/ Observer
本身的原型上有方法来启用组合——所以 class 本身有方法。
如果我理解这部分代码试图做什么...
我认为不,
我不明白你是怎么做到的 "simpler".
也许有什么需要改进的地方
就是让它更 "reusable",
使它成为一个模块 ?,
或者如果还没有类似的 Rx 运算符...
这可能是一次尝试
/*
"dependencies": {
"rxjs": "^5.0.2"
}
*/
import {Observable, Observer, Subject, Subscriber} from "rxjs";
export interface ICircularQueue<T> extends Observable<T> {
push(value: T): void;
}
/**
* on every push use 'NEXT' observer/subscription,
* in the order they've been subscribed,
* cycling back to 1st subscription after last
*/
export function create<T>(): ICircularQueue<T> {
let index = 0;
let obsEnqueue = new Subject<T>();
let queueStream = Observable.create((obs: Observer<T>) => {
let push = Subscriber.create<T>(v => {
// ! error ?
if ((index % obsEnqueue.observers.length) === obsEnqueue.observers.indexOf(push)) {
obs.next(v);
}
});
return obsEnqueue.subscribe(push);
});
queueStream.push = (v: T) => {
obsEnqueue.next(v);
index++;
};
return queueStream;
}
然后是基础测试....
import * as CircularQueue from "./CircularQueue";
import * as assert from "assert";
const $in = (array: any[], x: any) => {
for (let $x of array) {
if ($x === x) { return true; }
}
return false;
};
describe("CircularQueue", () => {
it("works", () => {
let queue = CircularQueue.create();
let result: number[] = [];
queue.subscribe(x => {
assert.ok($in([0, 4, 8], x));
result.push(0);
});
queue.subscribe(x => {
assert.ok($in([1, 5, 9], x));
result.push(1);
});
queue.subscribe(x => {
assert.ok($in([2, 6, 10], x));
result.push(2);
});
queue.subscribe(x => {
assert.ok($in([3, 7, 11], x));
result.push(3);
});
for (let i = 0; i < 12; i++) {
queue.push(i);
}
assert.equal(result.join(), "0,1,2,3,0,1,2,3,0,1,2,3");
});
});
在我的一个项目中,我有以下代码,在迁移到 RxJS5 后,Rx.Observer 似乎不再被定义:
let index = 0;
let obsEnqueue = this.obsEnqueue = new Rx.Subject();
this.queueStream = Rx.Observable.create(obs => {
var push = Rx.Observer.create(v => { // ! error
if ((index % obsEnqueue.observers.length) === obsEnqueue.observers.indexOf(push)) {
obs.next(v);
}
});
return obsEnqueue.subscribe(push);
});
this.push = (v) => {
obsEnqueue.next(v);
index++;
};
这不再有效,因为 Rx.Observer
未定义
在迁移指南中:
https://github.com/ReactiveX/rxjs/blob/master/MIGRATION.md
它说:
Observer is an interface now
然而,这不应该意味着 Rx.Observer,即使它是一个接口,也不应该有一个 "static" 方法,称为 create。
总之,Rx.Observer
好像已经不存在了。
我收到此错误:
TypeError: Cannot read property 'create' of undefined
我如何创建一个 Observer 以某种方式产生与我上面的代码类似的结果?
为什么不直接将 onNext 函数内联到订阅中?
this.queueStream = Rx.Observable.create(obs => {
return obsEnqueue.subscribe(
v => {
if ((index % obsEnqueue.observers.length) === obsEnqueue.observers.indexOf(push)) {
obs.next(v);
}
}
);
});
老实说,我不明白你的代码是做什么的,但即使 Observer
class 不再存在,它主要被 Subscriber
[=23= 取代了] 的使用方式几乎与 Observer
.
它有一个静态的Subscriber.create
方法。参见 https://github.com/ReactiveX/rxjs/blob/master/src/Subscriber.ts#L32
此方法 returns 一个 Subscriber
稍后可以使用的对象,例如 obsEnqueue.subscribe(push);
.
来源:
export interface Observer<T> {
closed?: boolean;
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}
Observer<T>
是与 onNext
、onCompleted
和 onError
方法的接口。接口只是一种语言结构。打字稿编译器仅使用它来对需要 Observer<T>
的对象进行类型检查。它在编译时被删除。
class Subscriber<T>
实现接口 Observer<T>
。
这意味着 Subscriber<T>
是上述方法的实际具体 class。
所以你改用var push = Rx.Subscriber.create(v => { [...]
。
注:
在最初的 Rx 实现中,接口是 IObservable<T>
和 IObserver<T>
并使用扩展方法来允许组合。当谈到 JS 时,他们必须在 Observable
/ Observer
本身的原型上有方法来启用组合——所以 class 本身有方法。
如果我理解这部分代码试图做什么...
我认为不,
我不明白你是怎么做到的 "simpler".
也许有什么需要改进的地方
就是让它更 "reusable",
使它成为一个模块 ?,
或者如果还没有类似的 Rx 运算符...
这可能是一次尝试
/*
"dependencies": {
"rxjs": "^5.0.2"
}
*/
import {Observable, Observer, Subject, Subscriber} from "rxjs";
export interface ICircularQueue<T> extends Observable<T> {
push(value: T): void;
}
/**
* on every push use 'NEXT' observer/subscription,
* in the order they've been subscribed,
* cycling back to 1st subscription after last
*/
export function create<T>(): ICircularQueue<T> {
let index = 0;
let obsEnqueue = new Subject<T>();
let queueStream = Observable.create((obs: Observer<T>) => {
let push = Subscriber.create<T>(v => {
// ! error ?
if ((index % obsEnqueue.observers.length) === obsEnqueue.observers.indexOf(push)) {
obs.next(v);
}
});
return obsEnqueue.subscribe(push);
});
queueStream.push = (v: T) => {
obsEnqueue.next(v);
index++;
};
return queueStream;
}
然后是基础测试....
import * as CircularQueue from "./CircularQueue";
import * as assert from "assert";
const $in = (array: any[], x: any) => {
for (let $x of array) {
if ($x === x) { return true; }
}
return false;
};
describe("CircularQueue", () => {
it("works", () => {
let queue = CircularQueue.create();
let result: number[] = [];
queue.subscribe(x => {
assert.ok($in([0, 4, 8], x));
result.push(0);
});
queue.subscribe(x => {
assert.ok($in([1, 5, 9], x));
result.push(1);
});
queue.subscribe(x => {
assert.ok($in([2, 6, 10], x));
result.push(2);
});
queue.subscribe(x => {
assert.ok($in([3, 7, 11], x));
result.push(3);
});
for (let i = 0; i < 12; i++) {
queue.push(i);
}
assert.equal(result.join(), "0,1,2,3,0,1,2,3,0,1,2,3");
});
});