RxJS 自定义操作符内部变量

RxJS Custom Operator Internal Variables

在 RxJS 中 using/mutating 来自自定义运算符闭包的变量有缺点吗?我意识到它违反了 "pure" 功能原则,您可以使用 scan 作为这个简单的示例,但我特别询问以下基础模式的具体技术问题:

const custom = () => {

  let state = 0; 

  return pipe(
    map(next => state * next),
    tap(_ => state += 1),
    share()
  )
}

// Usage
const obs = interval(1000).pipe(custom())

obs.subscribe()

正如您已经说过的,您失去了纯函数的一些优势。在这种特殊情况下,您 运行 延迟订阅者获得与您预期不同的数据流的风险(取决于您在实际情况中所做的与在这个构建的情况中所做的)。

例如,通过添加迟到的订阅者,流 'A' 会看到 0 和 1。流 'B' 只会看到“1”(它会跳过 0,因为 obs 从 'A' subscriber.Stream 'C' 会表现得像 stream 'A'.

const { interval, pipe, subscribe } = Rx;
const { take, map, tap, share  } = RxOperators;

const custom = () => {
  let state = 0; 
  return pipe(
    map(next => state * next),
    tap(_ => state += 1),
    share()
  )
}

// Late subscribers can get different streams
const obs = interval(500).pipe(custom())
const sub1 = obs.pipe(take(2)).subscribe((x) => console.log('A', x))
setTimeout(() => obs.pipe(take(1)).subscribe((x) => console.log('B', x)), 500)
setTimeout(() => obs.pipe(take(3)).subscribe((x) => console.log('C', x)), 3000)

这是可接受的还是预期的行为将取决于您的用例。虽然尝试使用纯函数以发挥其所有优势是件好事,但有时它不切实际或不适合您的用例。

您在 custom 运算符中存储状态的方式至少存在两个问题。

第一个问题是您这样做意味着运算符不再是引用透明的。也就是说,如果将运算符的调用替换为运算符的 return 值,则行为不同:

const { pipe, range } = rxjs;
const { map, share, tap } = rxjs.operators;

const custom = () => {
  let state = 0; 
  return pipe(
    map(next => state * next),
    tap(_ => state += 1),
    share()
  );
};

const op = custom();
console.log("first use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));
console.log("second use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>

第二个问题 - 正如另一个答案中提到的那样 - 不同的订阅将在其 next 通知中收到不同的值,因为运算符中的状态是共享的。

例如,如果源 observable 是同步的,连续的订阅将看到不同的值:

const { pipe, range } = rxjs;
const { map, share, tap } = rxjs.operators;

const custom = () => {
  let state = 0; 
  return pipe(
    map(next => state * next),
    tap(_ => state += 1),
    share()
  );
};

const source = range(1, 2).pipe(custom());
console.log("first subscription:");
source.subscribe(n  => console.log(n));
console.log("second subscription:");
source.subscribe(n => console.log(n));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>

但是,可以编写一个与您的 custom 运算符非常相似的运算符,并使其在所有情况下都能正确运行。为此,有必要确保运算符中的任何状态都是 per-subscription.

管道运算符只是一个函数,它接受一个可观察对象并且 returns 是一个可观察对象,因此您可以使用 defer 来确保您的状态是按订阅的,如下所示:

const { defer, pipe, range } = rxjs;
const { map, share, tap } = rxjs.operators;

const custom = () => {
  return source => defer(() => {
    let state = 0; 
    return source.pipe(
      map(next => state * next),
      tap(_ => state += 1)
    );
  }).pipe(share());
};

const op = custom();
console.log("first use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));
console.log("second use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));

const source = range(1, 2).pipe(op);
console.log("first subscription:");
source.subscribe(n => console.log(n));
console.log("second subscription:");
source.subscribe(n => console.log(n));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>