在 RxJS 中是否可以在 auditTime() 操作中测试未决值?

In RxJS is it possible to test for pending values in an auditTime() operation?

我正在使用 RxJS 的 .auditTime(500) 操作 (docs) 作为尾随节流阀:我想最多每 500 毫秒发出一次服务器调用。

服务器调用完成后的下游我需要查询是否有更多待处理的服务器调用,或者缓冲区现在是否已清除,以便我可以以状态消息的形式将此信息传达给用户,例如 "Saving…" 和 "Saved".

大致如下所示。

saveToServerObservable
  .do(() => {
    // gets called every time
    setStatus(Status.SAVING);
  })
  .auditTime(500) // wait 500 ms and emit no more than once per 500 ms
  .flatMap(data => axios({
    method: "post",
    url: "/saveurl",
    data: data,
  }))
  .map(response => response.data)
  .do(data => {
    // here I want to know whether there are pending values from the
    // auditTime() operation above or if the buffer is currently clear
    const pendingSaves = ???;
    if (!pendingSaves) {
     setStatus(Status.SAVED);
    }
  })
  .subscribe();

正如您在最后的 .do() 操作中看到的,我想知道是否有来自 .auditTime(500) 操作的未决值。我怎样才能实现这样的目标?

干杯!

我认为您可以使用 scan 并通过稍微修改您的链来实现您想要的:

const inc = new Subject();
const dec = new Subject();

const counter = Observable.merge(dec.mapTo(-1), inc.throttleTime(500).mapTo(1))
    .scan((acc, val) => acc + val, 0)
    .map(val => val > 0);

saveToServerObservable
  .do(() => {
    // gets called every time
    setStatus(Status.SAVING);
    inc.next();
  })
  .auditTime(500) // wait 500 ms and emit no more than once per 500 ms
  .flatMap(data => axios({
    method: "post",
    url: "/saveurl",
    data: data,
  }))
  .do(() => dec.next())
  .map(response => response.data)
  .withLatestFrom(counter, (data, pendingSaves) => {
    if (!pendingSaves) {
     setStatus(Status.SAVED);
    }
  })
  .subscribe();

整个想法在 counter Observable 中合并了 incdec。这两个 Observables 使用 scan() 递增和递减计数器。

inc 也与 .throttleTime(500) 链接在一起,与 .auditTime(500) 完全相反,因为当您调用 setStatus(Status.SAVING); 时,您总是知道这会使 .auditTime(500) 发出一个项目,因此您可以立即增加计数器。

然后 withLatestFrom 只是将计数器与远程调用的结果合并,这是您可以检查 counter.

的最新发射的地方

递增和递减计数器太容易出现错误,所以我最终采用了一种完全不同的方法。我现在单独跟踪本地数据是否 "dirty." 我使用这个脏信号向用户显示 "Saving…" vs "Saved" 消息:

  1. 每次用户进行本地编辑时,我在本地更新数据并将dirty设置为true
  2. 每次保存操作后,服务器都会用它拥有的最新版本的数据进行响应。
  3. 收到该响应后,我将本地版本的数据与服务器返回的数据进行比较,如果它们匹配,我将 dirty 设置为 false

在每次编辑时将 dirty 设置为 true

这里我为每次用户进行编辑定义了一个Rx.Subject。每次收到信号,我就把dirty设置为true

// stream of signals to save the active document
const userEditSignal$ = new Rx.Subject();

const savePrototype = () => {
  userEditSignal$.next();
};

userEditSignal$.subscribe(() => {
  // runs for each call to save the active document
  store.commit("SET_DIRTY", true);
});

观察dirty状态来决定何时保存到服务器

这让我们知道每次 dirty 值更改时,这与每次设置时都不一样。

const observeState = (store, getter) => {
  // irrelevant details redacted
}

// emits only when `dirty` changes, not every time it's set
const shouldSaveToServer$ = observeState(store, state => state.dirty);

创建请求对象和服务器响应的流

此自定义计时逻辑取代了对 auditTime() 运算符的需要。

const saveToServerSignal$ = shouldSaveToServer$.switchMap(shouldSave => {
  return shouldSave ?
    // as long as we should save, save every 500 ms
    Rx.Observable.interval(500) :
    // when we should not, stop
    Rx.Observable.never();
});

// create a request object for each save-to-server signal
const saveRequest$ = saveToServerSignal$
  .mapTo(store.state.activeDocument)
  .map(createSaveRequest);

// 
const saveResponse$ = saveRequest$
  // sends immediately
  .flatMap(request => axios(request));

在每次响应时,检查本地文档和从服务器返回的版本的差异

如果他们同意,我们可以将dirty设置为false

saveResponse$
  .map(response => response.data)
  .do(savedDocument => {
    const activeDocument = store.state.activeDocument;

    // update just `created`, `modified`, and `user`
    store.commit({
      type: "UPDATE_ACTIVE_DOCUMENT",
      // irrelevant details omitted
    });

    // diff current state and saved document (function details omitted)
    const activeAndSavedDocsMatch = diff(activeDocument, savedDocument);
    if (activeAndSavedDocsMatch) {
      store.commit("SET_DIRTY", false);
    }
  })
  .subscribe();