RxJS:一个将自己的旧值作为输入的 Observable
RxJS: An Observable that takes its own old value as input
我正在尝试实现这样的东西:
// This obviously doesn't work, because we try to refer to `a` before it exists.
// c and m are some Observables.
const aSampled = a.pipe(
rxjs.operators.sample(c),
rxjs.operators.startWith(aInitial)
);
const a = m.pipe(
rxjs.operators.withLatestFrom(aSampled),
map(([mValue, oldAValue]) => {
// Do something.
}),
rxjs.operators.startWith(aInitial)
);
现在这显然是无法编译的废话,因为我们在创建它之前尝试 sample
a
,但希望它能使我的意图明确:a
发出的每个值都应该依赖于一个a
之前发出的旧值的一部分。 a
的哪个旧值取决于 c
最后一次发出某些东西的时间。这有点像在 a
上调用 pairwise
,除了我不想要最后两个值,而是最新的和后面的另一个值。
请注意,如果不是 startWith(aInitial)
位,这甚至不是一个定义明确的问题,因为 a
发出的第一个值将循环定义,引用自身。但是,只要单独指定第一个值 a
,该构造就具有数学意义。我只是不知道如何以干净的方式在代码中实现它。我的直觉是,通过编写某种自定义 Subject 来实现此目的会是一种冗长的方式,但非常欢迎更优雅的方式。
为了更具体一点,在我的用例中,我处理的是单击并拖动到平移类型的 UI 元素。 m
是 mousemove
事件的 Observable,c
是 mousedown
事件的 Observable。 a
将根据光标所在的位置以及单击发生时 a
的值不断变化。
您可以根据行为主题创建一个名为先前主题的新主题。
行为主题的来源在这里https://github.com/ReactiveX/rxjs/blob/master/src/internal/BehaviorSubject.ts。
所以让我们创建一个先前的主题,它发出当前值和先前的值。
import { Subject } from 'rxjs';
import { Subscriber } from 'rxjs';
import { Subscription } from 'rxjs';
import { SubscriptionLike } from 'rxjs';
import { ObjectUnsubscribedError } from 'rxjs';
export class PreviousSubject<T> extends Subject<T[]> {
_value: T[];
constructor(value: T, previous?: T) {
super();
this._value = [value, previous];
}
get value(): T[] {
return this.getValue();
}
/** @deprecated This is an internal implementation detail, do not use. */
_subscribe(subscriber: Subscriber<T[]>): Subscription {
const subscription = super._subscribe(subscriber);
if (subscription && !(<SubscriptionLike>subscription).closed) {
subscriber.next(this._value);
}
return subscription;
}
getValue(): T[] {
if (this.hasError) {
throw this.thrownError;
} else if (this.closed) {
throw new ObjectUnsubscribedError();
} else {
return this._value;
}
}
next(value: T): void {
this._value = [value, this._value[0]];
super.next(this._value);
}
}
在 https://stackblitz.com/edit/typescript-wiayqx
查看演示
如果你不想要 TypeScript,也可以使用 vanilla JS
const { Subject } = rxjs;
class PreviousSubject extends Subject {
_value;
constructor(value, previous) {
super();
this._value = [value, previous];
}
get value() {
return this.getValue();
}
_subscribe(subscriber) {
const subscription = super._subscribe(subscriber);
if (subscription && !(subscription).closed) {
subscriber.next(this._value);
}
return subscription;
}
getValue() {
if (this.hasError) {
throw this.thrownError;
} else if (this.closed) {
throw new ObjectUnsubscribedError();
} else {
return this._value;
}
}
next(value) {
this._value = [value, this._value[0]];
super.next(this._value);
}
}
let ps$ = new PreviousSubject('Start value');
ps$.subscribe(([current, previous]) => {
console.log(`Current: ${current}, Previous: ${previous}`);
});
ps$.next('Value');
ps$.next('Another value');
ps$.next('Another value again');
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.3/rxjs.umd.min.js"></script>
经过一番思考,这是我自己的解决方案,以及一个最小的工作示例:
// Setup for MWE.
// Note that c just emits once a second, the actual values of it don't matter.
const c = rxjs.interval(1000).pipe(rxjs.operators.take(2));
const m = rxjs.interval(300).pipe(rxjs.operators.take(9));
const aInitial = -1;
// Initialize a.
const a = new rxjs.BehaviorSubject();
a.next(aInitial);
// For testing, log the values a emits.
a.subscribe((val) => {
console.log(`a: ${val}`);
});
// Sample a at each emission of c.
const aSampled = a.pipe(
rxjs.operators.sample(c),
rxjs.operators.startWith(aInitial),
);
// Every time m emits, get also the latest sampled value of a, and do something
// with the two.
m.pipe(
rxjs.operators.withLatestFrom(aSampled),
rxjs.operators.map(([mValue, oldAValue]) => {
// In place of actually computing something useful, just log the arguments
// and for the sake of demonstration return their product.
console.log(`m: ${mValue}, old a at last c-emission: ${oldAValue}`);
return mValue*oldAValue;
}),
).subscribe(a); // Route the output back into a.
输出
a: -1
m: 0, old a at last c-emission: -1
a: 0
m: 1, old a at last c-emission: -1
a: -1
m: 2, old a at last c-emission: -1
a: -2
m: 3, old a at last c-emission: -2
a: -6
m: 4, old a at last c-emission: -2
a: -8
m: 5, old a at last c-emission: -2
a: -10
m: 6, old a at last c-emission: -10
a: -60
m: 7, old a at last c-emission: -10
a: -70
m: 8, old a at last c-emission: -10
a: -80
诀窍是将 a
设置为 Subject 允许我们首先发出其初始值,然后将其连接到管道,该管道的输入之一是较旧的采样值a
。 Plain Observables 不允许这样做(AFAIK),因为您必须在创建时定义它们的所有输入。不过不需要自定义主题 class。
如果我没理解错的话,基本的事件流是 mousemove
和一个 mousedown
。
基于这些事件流,您必须计算一个新流 a
,它以与 mousemove
相同的频率发出,但其数据是基于当前事件的一些计算结果鼠标的位置和最后一个 mousedown
发出时的值 a
。
所以,如果这是真的,我们可以用以下 Observables
模拟 mousemove
和 mousedown
// the mouse is clicked every 1 second
const c = interval(1000).pipe(
tap(cVal => console.log('click', cVal))
);
// the mouse moves diagonally and emits every 200 ms
const m = interval(200).pipe(
map(mVal => [mVal, mVal]),
);
我们需要的是在 mousedown
发出时以某种方式掌握 Observable a
的值。我们怎样才能得到这个?
假设我们有一个名为 value_of_a
的 BehaviourSubject
,其初始值为 1
并保存 a
的值。如果我们有这样的 Observable,我们可以在 mousedown
发射时得到它的值,就像这样
const last_relevant_a = c.pipe( // when mousedown emits
switchMap(() => value_of_a.pipe( // the control is switched to the Observable value_of_a
take(1), // and we consider only its first value
)),
);
使用 m
、mousemove Observable 和 last_relevant_a
我们拥有所有需要的 Observable。事实上,我们只需结合他们最新的排放量,就可以得到我们需要的所有元素来计算 a
.
的新值
const a = combineLatest(m, last_relevant_a)
.submit(
([mouseVal, old_a_val] => {
// do something to calculate the new value of a
}
);
现在唯一要做的就是确保 value_of_a
发出 a
发出的任何值。这可以在 a
本身的订阅中调用 value_of_a
上的 next
来完成。
将它们拼接在一起,解决方案可能类似于
const c = interval(1000).pipe(
tap(cVal => console.log('click', cVal))
);
const m = interval(200).pipe(
map(mVal => [mVal, mVal]),
);
const value_of_a = new BehaviorSubject<number>(1);
const last_relevant_a = c.pipe(
switchMap(cVal => value_of_a.pipe(
take(1),
)),
);
const a = combineLatest(m, last_relevant_a);
a.pipe(
take(20)
)
.subscribe(
val => {
// new value of a calculated with an arbitrary logic
const new_value_of_a = val[0][0] * val[0][1] * val[1];
// the new value of a is emitted by value_of_a
value_of_a.next(new_value_of_a);
}
)
可能这也是 expand
运算符的一个用例,但应该对其进行调查。
我正在尝试实现这样的东西:
// This obviously doesn't work, because we try to refer to `a` before it exists.
// c and m are some Observables.
const aSampled = a.pipe(
rxjs.operators.sample(c),
rxjs.operators.startWith(aInitial)
);
const a = m.pipe(
rxjs.operators.withLatestFrom(aSampled),
map(([mValue, oldAValue]) => {
// Do something.
}),
rxjs.operators.startWith(aInitial)
);
现在这显然是无法编译的废话,因为我们在创建它之前尝试 sample
a
,但希望它能使我的意图明确:a
发出的每个值都应该依赖于一个a
之前发出的旧值的一部分。 a
的哪个旧值取决于 c
最后一次发出某些东西的时间。这有点像在 a
上调用 pairwise
,除了我不想要最后两个值,而是最新的和后面的另一个值。
请注意,如果不是 startWith(aInitial)
位,这甚至不是一个定义明确的问题,因为 a
发出的第一个值将循环定义,引用自身。但是,只要单独指定第一个值 a
,该构造就具有数学意义。我只是不知道如何以干净的方式在代码中实现它。我的直觉是,通过编写某种自定义 Subject 来实现此目的会是一种冗长的方式,但非常欢迎更优雅的方式。
为了更具体一点,在我的用例中,我处理的是单击并拖动到平移类型的 UI 元素。 m
是 mousemove
事件的 Observable,c
是 mousedown
事件的 Observable。 a
将根据光标所在的位置以及单击发生时 a
的值不断变化。
您可以根据行为主题创建一个名为先前主题的新主题。
行为主题的来源在这里https://github.com/ReactiveX/rxjs/blob/master/src/internal/BehaviorSubject.ts。
所以让我们创建一个先前的主题,它发出当前值和先前的值。
import { Subject } from 'rxjs';
import { Subscriber } from 'rxjs';
import { Subscription } from 'rxjs';
import { SubscriptionLike } from 'rxjs';
import { ObjectUnsubscribedError } from 'rxjs';
export class PreviousSubject<T> extends Subject<T[]> {
_value: T[];
constructor(value: T, previous?: T) {
super();
this._value = [value, previous];
}
get value(): T[] {
return this.getValue();
}
/** @deprecated This is an internal implementation detail, do not use. */
_subscribe(subscriber: Subscriber<T[]>): Subscription {
const subscription = super._subscribe(subscriber);
if (subscription && !(<SubscriptionLike>subscription).closed) {
subscriber.next(this._value);
}
return subscription;
}
getValue(): T[] {
if (this.hasError) {
throw this.thrownError;
} else if (this.closed) {
throw new ObjectUnsubscribedError();
} else {
return this._value;
}
}
next(value: T): void {
this._value = [value, this._value[0]];
super.next(this._value);
}
}
在 https://stackblitz.com/edit/typescript-wiayqx
查看演示如果你不想要 TypeScript,也可以使用 vanilla JS
const { Subject } = rxjs;
class PreviousSubject extends Subject {
_value;
constructor(value, previous) {
super();
this._value = [value, previous];
}
get value() {
return this.getValue();
}
_subscribe(subscriber) {
const subscription = super._subscribe(subscriber);
if (subscription && !(subscription).closed) {
subscriber.next(this._value);
}
return subscription;
}
getValue() {
if (this.hasError) {
throw this.thrownError;
} else if (this.closed) {
throw new ObjectUnsubscribedError();
} else {
return this._value;
}
}
next(value) {
this._value = [value, this._value[0]];
super.next(this._value);
}
}
let ps$ = new PreviousSubject('Start value');
ps$.subscribe(([current, previous]) => {
console.log(`Current: ${current}, Previous: ${previous}`);
});
ps$.next('Value');
ps$.next('Another value');
ps$.next('Another value again');
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.3/rxjs.umd.min.js"></script>
经过一番思考,这是我自己的解决方案,以及一个最小的工作示例:
// Setup for MWE.
// Note that c just emits once a second, the actual values of it don't matter.
const c = rxjs.interval(1000).pipe(rxjs.operators.take(2));
const m = rxjs.interval(300).pipe(rxjs.operators.take(9));
const aInitial = -1;
// Initialize a.
const a = new rxjs.BehaviorSubject();
a.next(aInitial);
// For testing, log the values a emits.
a.subscribe((val) => {
console.log(`a: ${val}`);
});
// Sample a at each emission of c.
const aSampled = a.pipe(
rxjs.operators.sample(c),
rxjs.operators.startWith(aInitial),
);
// Every time m emits, get also the latest sampled value of a, and do something
// with the two.
m.pipe(
rxjs.operators.withLatestFrom(aSampled),
rxjs.operators.map(([mValue, oldAValue]) => {
// In place of actually computing something useful, just log the arguments
// and for the sake of demonstration return their product.
console.log(`m: ${mValue}, old a at last c-emission: ${oldAValue}`);
return mValue*oldAValue;
}),
).subscribe(a); // Route the output back into a.
输出
a: -1
m: 0, old a at last c-emission: -1
a: 0
m: 1, old a at last c-emission: -1
a: -1
m: 2, old a at last c-emission: -1
a: -2
m: 3, old a at last c-emission: -2
a: -6
m: 4, old a at last c-emission: -2
a: -8
m: 5, old a at last c-emission: -2
a: -10
m: 6, old a at last c-emission: -10
a: -60
m: 7, old a at last c-emission: -10
a: -70
m: 8, old a at last c-emission: -10
a: -80
诀窍是将 a
设置为 Subject 允许我们首先发出其初始值,然后将其连接到管道,该管道的输入之一是较旧的采样值a
。 Plain Observables 不允许这样做(AFAIK),因为您必须在创建时定义它们的所有输入。不过不需要自定义主题 class。
如果我没理解错的话,基本的事件流是 mousemove
和一个 mousedown
。
基于这些事件流,您必须计算一个新流 a
,它以与 mousemove
相同的频率发出,但其数据是基于当前事件的一些计算结果鼠标的位置和最后一个 mousedown
发出时的值 a
。
所以,如果这是真的,我们可以用以下 Observables
模拟mousemove
和 mousedown
// the mouse is clicked every 1 second
const c = interval(1000).pipe(
tap(cVal => console.log('click', cVal))
);
// the mouse moves diagonally and emits every 200 ms
const m = interval(200).pipe(
map(mVal => [mVal, mVal]),
);
我们需要的是在 mousedown
发出时以某种方式掌握 Observable a
的值。我们怎样才能得到这个?
假设我们有一个名为 value_of_a
的 BehaviourSubject
,其初始值为 1
并保存 a
的值。如果我们有这样的 Observable,我们可以在 mousedown
发射时得到它的值,就像这样
const last_relevant_a = c.pipe( // when mousedown emits
switchMap(() => value_of_a.pipe( // the control is switched to the Observable value_of_a
take(1), // and we consider only its first value
)),
);
使用 m
、mousemove Observable 和 last_relevant_a
我们拥有所有需要的 Observable。事实上,我们只需结合他们最新的排放量,就可以得到我们需要的所有元素来计算 a
.
const a = combineLatest(m, last_relevant_a)
.submit(
([mouseVal, old_a_val] => {
// do something to calculate the new value of a
}
);
现在唯一要做的就是确保 value_of_a
发出 a
发出的任何值。这可以在 a
本身的订阅中调用 value_of_a
上的 next
来完成。
将它们拼接在一起,解决方案可能类似于
const c = interval(1000).pipe(
tap(cVal => console.log('click', cVal))
);
const m = interval(200).pipe(
map(mVal => [mVal, mVal]),
);
const value_of_a = new BehaviorSubject<number>(1);
const last_relevant_a = c.pipe(
switchMap(cVal => value_of_a.pipe(
take(1),
)),
);
const a = combineLatest(m, last_relevant_a);
a.pipe(
take(20)
)
.subscribe(
val => {
// new value of a calculated with an arbitrary logic
const new_value_of_a = val[0][0] * val[0][1] * val[1];
// the new value of a is emitted by value_of_a
value_of_a.next(new_value_of_a);
}
)
可能这也是 expand
运算符的一个用例,但应该对其进行调查。