RxJS - 缓冲唯一值并在 x 秒内没有发出其他值时发出?

RxJS - Buffer unique values and emit when no other values have been emitted for x seconds?

用户在表单中键入值,每次用户编辑特定字段时都会发出一个事件,值是他们编辑的字段。

例如,用户在描述字段中输入 3 次,然后在名称字段中输入两次,看起来像

"description" => "description" => "description" => "name" => "name" => ...

我想缓冲 unique 值,并在用户停止输入 x 秒后将它们作为数组发出。值 可能 再次出现在不同的缓冲区 window 中。

本质上这是为了跟踪当用户停止输入时更新了哪些字段并与服务器通信以保存编辑的值。


到目前为止,我有这个 3000 毫秒发出一次,而且它不会在缓冲时防止重复,而是我们之后 "deduplicate" 数组。

   this.update$
      .bufferTime(3000)
      .filter(buffer => buffer.length > 0)
      .map(buffer => [...new Set(buffer)])
      .subscribe(x => console.log(x));

所以它应该监听直到发出一个值,然后缓冲 unique直到 不再为 [=24 发出值=]x 秒,然后发出缓冲区并重复。怎样才能做到这一点?

可能是我的问题不够清楚,反正我是这样解决的:(以防对别人有帮助)

为了让缓冲区在流静默 3 秒时仅 发出,我会在每次用户键入内容时启动一个新计时器(事件在 update$ 上发出), 并用switchMap取消前面的

this.update$
  .buffer(this.update$.switchMap(x => Observable.timer(3000)))
  .filter(buffer => buffer.length > 0)
  .map(buffer => [...new Set(buffer)])
  .subscribe(console.log);

然后为了让缓冲区本身是唯一的而不是必须手动删除重复数据,我必须创建一个自定义运算符 uniqueBuffer

this.update$
  .uniqueBuffer(this.update$.switchMap(x => Observable.timer(3000)))
  .filter(buffer => buffer.length > 0)
  .subscribe(console.log);

function uniqueBuffer(emitObservable) {
  return Observable.create(subscriber => {
    const source = this;
    const uniqueBuffer = new Set();

    const subscription = source.subscribe(value => {
      uniqueBuffer.add(value);
    });

    emitObservable.subscribe(emit => {
      subscriber.next([...uniqueBuffer]);
      uniqueBuffer.clear();
    })
  })
}

Observable.prototype.uniqueBuffer = uniqueBuffer;

这可能是另一个版本:

const { Observable } = Rx;

const log = (prefix) => (...args) => { console.log(prefix, ...args); };

const inputs = document.querySelectorAll('input');

const updates$ = Observable
  .fromEvent(inputs, 'input')
  .pluck('target', 'id')
;

// wait x ms after last update
const flush$ = updates$.debounceTime(3000);

const buffered$ = updates$
  // use `distinct` without `keySelector`, but reset with flush$
  .distinct(null, flush$)
  
  // flush the buffer using flush$ as `notifier`
  .buffer(flush$)
;

buffered$.subscribe(log('buffered$ =>'));
<script src="https://unpkg.com/@reactivex/rxjs@^5/dist/global/Rx.min.js"></script>

<div><input type="text" placeholder="foo" id="input.foo"></div>
<div><input type="text" placeholder="bar" id="input.bar"></div>
<div><input type="text" placeholder="baz" id="input.baz"></div>