在 RxJS 中是否可以在 auditTime() 操作中测试未决值?
In RxJS is it possible to test for pending values in an auditTime() operation?
我正在使用 RxJS 的 .auditTime(500)
操作 (docs) 作为尾随节流阀:我想最多每 500 毫秒发出一次服务器调用。
服务器调用完成后的下游我需要查询是否有更多待处理的服务器调用,或者缓冲区现在是否已清除,以便我可以以状态消息的形式将此信息传达给用户,例如 "Saving…" 和 "Saved".
大致如下所示。
saveToServerObservable
.do(() => {
// gets called every time
setStatus(Status.SAVING);
})
.auditTime(500) // wait 500 ms and emit no more than once per 500 ms
.flatMap(data => axios({
method: "post",
url: "/saveurl",
data: data,
}))
.map(response => response.data)
.do(data => {
// here I want to know whether there are pending values from the
// auditTime() operation above or if the buffer is currently clear
const pendingSaves = ???;
if (!pendingSaves) {
setStatus(Status.SAVED);
}
})
.subscribe();
正如您在最后的 .do()
操作中看到的,我想知道是否有来自 .auditTime(500)
操作的未决值。我怎样才能实现这样的目标?
干杯!
我认为您可以使用 scan
并通过稍微修改您的链来实现您想要的:
const inc = new Subject();
const dec = new Subject();
const counter = Observable.merge(dec.mapTo(-1), inc.throttleTime(500).mapTo(1))
.scan((acc, val) => acc + val, 0)
.map(val => val > 0);
saveToServerObservable
.do(() => {
// gets called every time
setStatus(Status.SAVING);
inc.next();
})
.auditTime(500) // wait 500 ms and emit no more than once per 500 ms
.flatMap(data => axios({
method: "post",
url: "/saveurl",
data: data,
}))
.do(() => dec.next())
.map(response => response.data)
.withLatestFrom(counter, (data, pendingSaves) => {
if (!pendingSaves) {
setStatus(Status.SAVED);
}
})
.subscribe();
整个想法在 counter
Observable 中合并了 inc
和 dec
。这两个 Observables 使用 scan()
递增和递减计数器。
inc
也与 .throttleTime(500)
链接在一起,与 .auditTime(500)
完全相反,因为当您调用 setStatus(Status.SAVING);
时,您总是知道这会使 .auditTime(500)
发出一个项目,因此您可以立即增加计数器。
然后 withLatestFrom
只是将计数器与远程调用的结果合并,这是您可以检查 counter
.
的最新发射的地方
递增和递减计数器太容易出现错误,所以我最终采用了一种完全不同的方法。我现在单独跟踪本地数据是否 "dirty." 我使用这个脏信号向用户显示 "Saving…" vs "Saved" 消息:
- 每次用户进行本地编辑时,我在本地更新数据并将
dirty
设置为true
。
- 每次保存操作后,服务器都会用它拥有的最新版本的数据进行响应。
- 收到该响应后,我将本地版本的数据与服务器返回的数据进行比较,如果它们匹配,我将
dirty
设置为 false
。
在每次编辑时将 dirty
设置为 true
这里我为每次用户进行编辑定义了一个Rx.Subject
。每次收到信号,我就把dirty
设置为true
。
// stream of signals to save the active document
const userEditSignal$ = new Rx.Subject();
const savePrototype = () => {
userEditSignal$.next();
};
userEditSignal$.subscribe(() => {
// runs for each call to save the active document
store.commit("SET_DIRTY", true);
});
观察dirty
状态来决定何时保存到服务器
这让我们知道每次 dirty
值更改时,这与每次设置时都不一样。
const observeState = (store, getter) => {
// irrelevant details redacted
}
// emits only when `dirty` changes, not every time it's set
const shouldSaveToServer$ = observeState(store, state => state.dirty);
创建请求对象和服务器响应的流
此自定义计时逻辑取代了对 auditTime()
运算符的需要。
const saveToServerSignal$ = shouldSaveToServer$.switchMap(shouldSave => {
return shouldSave ?
// as long as we should save, save every 500 ms
Rx.Observable.interval(500) :
// when we should not, stop
Rx.Observable.never();
});
// create a request object for each save-to-server signal
const saveRequest$ = saveToServerSignal$
.mapTo(store.state.activeDocument)
.map(createSaveRequest);
//
const saveResponse$ = saveRequest$
// sends immediately
.flatMap(request => axios(request));
在每次响应时,检查本地文档和从服务器返回的版本的差异
如果他们同意,我们可以将dirty
设置为false
。
saveResponse$
.map(response => response.data)
.do(savedDocument => {
const activeDocument = store.state.activeDocument;
// update just `created`, `modified`, and `user`
store.commit({
type: "UPDATE_ACTIVE_DOCUMENT",
// irrelevant details omitted
});
// diff current state and saved document (function details omitted)
const activeAndSavedDocsMatch = diff(activeDocument, savedDocument);
if (activeAndSavedDocsMatch) {
store.commit("SET_DIRTY", false);
}
})
.subscribe();
我正在使用 RxJS 的 .auditTime(500)
操作 (docs) 作为尾随节流阀:我想最多每 500 毫秒发出一次服务器调用。
服务器调用完成后的下游我需要查询是否有更多待处理的服务器调用,或者缓冲区现在是否已清除,以便我可以以状态消息的形式将此信息传达给用户,例如 "Saving…" 和 "Saved".
大致如下所示。
saveToServerObservable
.do(() => {
// gets called every time
setStatus(Status.SAVING);
})
.auditTime(500) // wait 500 ms and emit no more than once per 500 ms
.flatMap(data => axios({
method: "post",
url: "/saveurl",
data: data,
}))
.map(response => response.data)
.do(data => {
// here I want to know whether there are pending values from the
// auditTime() operation above or if the buffer is currently clear
const pendingSaves = ???;
if (!pendingSaves) {
setStatus(Status.SAVED);
}
})
.subscribe();
正如您在最后的 .do()
操作中看到的,我想知道是否有来自 .auditTime(500)
操作的未决值。我怎样才能实现这样的目标?
干杯!
我认为您可以使用 scan
并通过稍微修改您的链来实现您想要的:
const inc = new Subject();
const dec = new Subject();
const counter = Observable.merge(dec.mapTo(-1), inc.throttleTime(500).mapTo(1))
.scan((acc, val) => acc + val, 0)
.map(val => val > 0);
saveToServerObservable
.do(() => {
// gets called every time
setStatus(Status.SAVING);
inc.next();
})
.auditTime(500) // wait 500 ms and emit no more than once per 500 ms
.flatMap(data => axios({
method: "post",
url: "/saveurl",
data: data,
}))
.do(() => dec.next())
.map(response => response.data)
.withLatestFrom(counter, (data, pendingSaves) => {
if (!pendingSaves) {
setStatus(Status.SAVED);
}
})
.subscribe();
整个想法在 counter
Observable 中合并了 inc
和 dec
。这两个 Observables 使用 scan()
递增和递减计数器。
inc
也与 .throttleTime(500)
链接在一起,与 .auditTime(500)
完全相反,因为当您调用 setStatus(Status.SAVING);
时,您总是知道这会使 .auditTime(500)
发出一个项目,因此您可以立即增加计数器。
然后 withLatestFrom
只是将计数器与远程调用的结果合并,这是您可以检查 counter
.
递增和递减计数器太容易出现错误,所以我最终采用了一种完全不同的方法。我现在单独跟踪本地数据是否 "dirty." 我使用这个脏信号向用户显示 "Saving…" vs "Saved" 消息:
- 每次用户进行本地编辑时,我在本地更新数据并将
dirty
设置为true
。 - 每次保存操作后,服务器都会用它拥有的最新版本的数据进行响应。
- 收到该响应后,我将本地版本的数据与服务器返回的数据进行比较,如果它们匹配,我将
dirty
设置为false
。
在每次编辑时将 dirty
设置为 true
这里我为每次用户进行编辑定义了一个Rx.Subject
。每次收到信号,我就把dirty
设置为true
。
// stream of signals to save the active document
const userEditSignal$ = new Rx.Subject();
const savePrototype = () => {
userEditSignal$.next();
};
userEditSignal$.subscribe(() => {
// runs for each call to save the active document
store.commit("SET_DIRTY", true);
});
观察dirty
状态来决定何时保存到服务器
这让我们知道每次 dirty
值更改时,这与每次设置时都不一样。
const observeState = (store, getter) => {
// irrelevant details redacted
}
// emits only when `dirty` changes, not every time it's set
const shouldSaveToServer$ = observeState(store, state => state.dirty);
创建请求对象和服务器响应的流
此自定义计时逻辑取代了对 auditTime()
运算符的需要。
const saveToServerSignal$ = shouldSaveToServer$.switchMap(shouldSave => {
return shouldSave ?
// as long as we should save, save every 500 ms
Rx.Observable.interval(500) :
// when we should not, stop
Rx.Observable.never();
});
// create a request object for each save-to-server signal
const saveRequest$ = saveToServerSignal$
.mapTo(store.state.activeDocument)
.map(createSaveRequest);
//
const saveResponse$ = saveRequest$
// sends immediately
.flatMap(request => axios(request));
在每次响应时,检查本地文档和从服务器返回的版本的差异
如果他们同意,我们可以将dirty
设置为false
。
saveResponse$
.map(response => response.data)
.do(savedDocument => {
const activeDocument = store.state.activeDocument;
// update just `created`, `modified`, and `user`
store.commit({
type: "UPDATE_ACTIVE_DOCUMENT",
// irrelevant details omitted
});
// diff current state and saved document (function details omitted)
const activeAndSavedDocsMatch = diff(activeDocument, savedDocument);
if (activeAndSavedDocsMatch) {
store.commit("SET_DIRTY", false);
}
})
.subscribe();