如何重置扫描操作符或流?

How can I reset a scan operator or stream?

message$ = new Subject();

method(type) {
    return this.message$.pipe(
        filter((element) => element.type === type),
        scan((acc, val) => [...acc, val], [])
    )
}

method.subscribe()

我想通过另一种方法在部分扫描操作中重置我的流。

 reset() {
   // reset logic
}

如果我尝试使用 message$.next() 重置消息,它仍然不会重置 scun,但会使重置值成为流的一部分。

我怎么能做到这一点?

一旦您理解了这个概念,就有一个简单的解决方案。您可以将在扫描中更新状态的逻辑移动到 higher order function and combine multiple different state interactions using the merge 运算符。

const { Subject, merge } = rxjs;
const { scan, map } = rxjs.operators;

message$ = new Subject();
reset$ = new Subject();

add = (message) => (state) => [...state, message];
clear = () => (state) => [];

const result$ = merge(
  // The function add is applied to message$ and the outer function is called with the message value
  message$.pipe(map(add)),
  // The function clear is applied to reset$ and the outer function is called
  reset$.pipe(map(clear))
).pipe(
  // The inner function is now fn. In case of add its now: (state) => [...state, message]
  // The function is called with the previous state and the new update is now the value of the scan.
  scan((state, fn) => fn(state), [])
)

result$.subscribe(console.log)

message$.next("Hello")
message$.next("Goodbye")
reset$.next()
message$.next("Hello again!")
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.3/rxjs.umd.min.js"></script>

您可以将此解决方案用于 scan 的每个状态操作。您编写的用于操作 scan-state 的函数不受限制。另一个示例可能是您只删除最后一个值的函数:

// Subject to trigger
const deleteLastValue = new Subject();

// Function that manipulates the state
const deleteLast = () => (state) => state.slice(0, -1)

// Apply the function to the observable/subject.
deleteLastValue$.pipe(map(deleteLast))