RxJS:一个将自己的旧值作为输入的 Observable

RxJS: An Observable that takes its own old value as input

我正在尝试实现这样的东西:

// This obviously doesn't work, because we try to refer to `a` before it exists.
// c and m are some Observables.
const aSampled = a.pipe(
  rxjs.operators.sample(c),
  rxjs.operators.startWith(aInitial)
);
const a = m.pipe(
  rxjs.operators.withLatestFrom(aSampled),
  map(([mValue, oldAValue]) => {
    // Do something.
  }),
  rxjs.operators.startWith(aInitial)
);

现在这显然是无法编译的废话,因为我们在创建它之前尝试 sample a,但希望它能使我的意图明确:a 发出的每个值都应该依赖于一个a 之前发出的旧值的一部分。 a 的哪个旧值取决于 c 最后一次发出某些东西的时间。这有点像在 a 上调用 pairwise,除了我不想要最后两个值,而是最新的和后面的另一个值。

请注意,如果不是 startWith(aInitial) 位,这甚至不是一个定义明确的问题,因为 a 发出的第一个值将循环定义,引用自身。但是,只要单独指定第一个值 a,该构造就具有数学意义。我只是不知道如何以干净的方式在代码中实现它。我的直觉是,通过编写某种自定义 Subject 来实现此目的会是一种冗长的方式,但非常欢迎更优雅的方式。

为了更具体一点,在我的用例中,我处理的是单击并拖动到平移类型的 UI 元素。 mmousemove 事件的 Observable,cmousedown 事件的 Observable。 a 将根据光标所在的位置以及单击发生时 a 的值不断变化。

您可以根据行为主题创建一个名为先前主题的新主题。

行为主题的来源在这里https://github.com/ReactiveX/rxjs/blob/master/src/internal/BehaviorSubject.ts

所以让我们创建一个先前的主题,它发出当前值和先前的值。

import { Subject } from 'rxjs';
import { Subscriber } from 'rxjs';
import { Subscription } from 'rxjs';
import { SubscriptionLike } from 'rxjs';
import { ObjectUnsubscribedError } from 'rxjs';

export class PreviousSubject<T> extends Subject<T[]> {

  _value: T[];

  constructor(value: T, previous?: T) {
    super();
    this._value = [value, previous];
  }

  get value(): T[] {
    return this.getValue();
  }

  /** @deprecated This is an internal implementation detail, do not use. */
  _subscribe(subscriber: Subscriber<T[]>): Subscription {
    const subscription = super._subscribe(subscriber);
    if (subscription && !(<SubscriptionLike>subscription).closed) {
      subscriber.next(this._value);
    }
    return subscription;
  }

  getValue(): T[] {
    if (this.hasError) {
      throw this.thrownError;
    } else if (this.closed) {
      throw new ObjectUnsubscribedError();
    } else {
      return this._value;
    }
  }

  next(value: T): void {
    this._value = [value, this._value[0]];
    super.next(this._value);
  }
}

https://stackblitz.com/edit/typescript-wiayqx

查看演示

如果你不想要 TypeScript,也可以使用 vanilla JS

const { Subject } = rxjs;

class PreviousSubject extends Subject {

  _value;

  constructor(value, previous) {
    super();
    this._value = [value, previous];
  }

  get value() {
    return this.getValue();
  }

  _subscribe(subscriber) {
    const subscription = super._subscribe(subscriber);
    if (subscription && !(subscription).closed) {
      subscriber.next(this._value);
    }
    return subscription;
  }

  getValue() {
    if (this.hasError) {
      throw this.thrownError;
    } else if (this.closed) {
      throw new ObjectUnsubscribedError();
    } else {
      return this._value;
    }
  }

  next(value) {
    this._value = [value, this._value[0]];
    super.next(this._value);
  }
}

let ps$ = new PreviousSubject('Start value');

ps$.subscribe(([current, previous]) => {
  console.log(`Current: ${current}, Previous: ${previous}`);
});

ps$.next('Value');
ps$.next('Another value');
ps$.next('Another value again');
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.3/rxjs.umd.min.js"></script>

经过一番思考,这是我自己的解决方案,以及一个最小的工作示例:

// Setup for MWE.
// Note that c just emits once a second, the actual values of it don't matter.
const c = rxjs.interval(1000).pipe(rxjs.operators.take(2));
const m = rxjs.interval(300).pipe(rxjs.operators.take(9));
const aInitial = -1;

// Initialize a.
const a = new rxjs.BehaviorSubject();
a.next(aInitial);

// For testing, log the values a emits.
a.subscribe((val) => {
  console.log(`a: ${val}`);
});

// Sample a at each emission of c.
const aSampled = a.pipe(
  rxjs.operators.sample(c),
  rxjs.operators.startWith(aInitial),
);

// Every time m emits, get also the latest sampled value of a, and do something
// with the two.
m.pipe(
  rxjs.operators.withLatestFrom(aSampled),
  rxjs.operators.map(([mValue, oldAValue]) => {
    // In place of actually computing something useful, just log the arguments
    // and for the sake of demonstration return their product.
    console.log(`m: ${mValue}, old a at last c-emission: ${oldAValue}`);
    return mValue*oldAValue;
  }),
).subscribe(a);  // Route the output back into a.

输出

a: -1
m: 0, old a at last c-emission: -1
a: 0
m: 1, old a at last c-emission: -1
a: -1
m: 2, old a at last c-emission: -1
a: -2
m: 3, old a at last c-emission: -2
a: -6
m: 4, old a at last c-emission: -2
a: -8
m: 5, old a at last c-emission: -2
a: -10
m: 6, old a at last c-emission: -10
a: -60
m: 7, old a at last c-emission: -10
a: -70
m: 8, old a at last c-emission: -10
a: -80

诀窍是将 a 设置为 Subject 允许我们首先发出其初始值,然后将其连接到管道,该管道的输入之一是较旧的采样值a。 Plain Observables 不允许这样做(AFAIK),因为您必须在创建时定义它们的所有输入。不过不需要自定义主题 class。

如果我没理解错的话,基本的事件流是 mousemove 和一个 mousedown

基于这些事件流,您必须计算一个新流 a,它以与 mousemove 相同的频率发出,但其数据是基于当前事件的一些计算结果鼠标的位置和最后一个 mousedown 发出时的值 a

所以,如果这是真的,我们可以用以下 Observables

模拟 mousemovemousedown
// the mouse is clicked every 1 second
const c = interval(1000).pipe(
    tap(cVal => console.log('click', cVal))
);
// the mouse moves diagonally and emits every 200 ms
const m = interval(200).pipe(
    map(mVal => [mVal, mVal]),
);

我们需要的是在 mousedown 发出时以某种方式掌握 Observable a 的值。我们怎样才能得到这个?

假设我们有一个名为 value_of_aBehaviourSubject,其初始值为 1 并保存 a 的值。如果我们有这样的 Observable,我们可以在 mousedown 发射时得到它的值,就像这样

const last_relevant_a = c.pipe(       // when mousedown emits
    switchMap(() => value_of_a.pipe(  // the control is switched to the Observable value_of_a
        take(1),                      // and we consider only its first value
    )),
);

使用 m、mousemove Observable 和 last_relevant_a 我们拥有所有需要的 Observable。事实上,我们只需结合他们最新的排放量,就可以得到我们需要的所有元素来计算 a.

的新值
const a = combineLatest(m, last_relevant_a)
.submit(
   ([mouseVal, old_a_val] => {
      // do something to calculate the new value of a
   }
);

现在唯一要做的就是确保 value_of_a 发出 a 发出的任何值。这可以在 a 本身的订阅中调用 value_of_a 上的 next 来完成。

将它们拼接在一起,解决方案可能类似于

const c = interval(1000).pipe(
    tap(cVal => console.log('click', cVal))
);

const m = interval(200).pipe(
    map(mVal => [mVal, mVal]),
);

const value_of_a = new BehaviorSubject<number>(1);

const last_relevant_a = c.pipe(
    switchMap(cVal => value_of_a.pipe(
        take(1),
    )),
);

const a = combineLatest(m, last_relevant_a);

a.pipe(
    take(20)
)
.subscribe(
    val => {
        // new value of a calculated with an arbitrary logic
        const new_value_of_a = val[0][0] * val[0][1] * val[1];
        // the new value of a is emitted by value_of_a
        value_of_a.next(new_value_of_a);
    }
)

可能这也是 expand 运算符的一个用例,但应该对其进行调查。