Angular : 如何去除 Observable 的抖动?
Angular : how to debounce an Observable?
在我的应用程序中,我有一个服务 return 是一个像这样的可观察对象:
public genericService(params) {
//Do some stuff
//...
return this.http.post('http://foo.com', params)
.map((response) => {
//Do some generic stuff
//...
return someData;
})
.catch((error: any) => {
//Manage error in a generic way + do some generic stuff
//...
return Observable.throw(error);
});
}
let debouncePointer = debounceObservable(genericService, 200);
public genericServiceDebounce(params) {
return debouncePointer(params);
}
现在在另一个地方,我想这样调用我的函数
genericServiceDebounce(params)
.subscribe((response) => {
//Do some non-generic stuff
}, (error) => {
//Manage error in a non-generic way + do some non-generic stuff
});
但是我没有成功实现debounceObservable()函数
我尝试了这个基于 Promise 等价物的实现(https://github.com/moszeed/es6-promise-debounce/blob/master/src/es6-promise-debounce.js):
debounceObservable(callback, delay, immediate?) {
let timeout;
return function () {
let context = this, args = arguments;
return Observable.create((observer) => {
let later = function () {
timeout = null;
if(!immediate) {
observer.next(callback.apply(context, args));
//observer.onCompleted(); // don't know if this is needed
}
};
let callNow = immediate && !timeout;
clearTimeout(timeout);
timeout = setTimeout(later, delay);
if(callNow) {
observer.next(callback.apply(context, args));
//observer.onCompleted(); // don't know if this is needed
}
});
}
}
但这并没有像预期的那样工作。
使用 Promises 时,returning resolve(anotherPromise) 允许您调用:
genericServiceDebounce().then(response => {
})
当使用 Observables 时,returning observer.next(anotherObservable) return 一个嵌入的 observable,这意味着你应该调用 :
genericServiceDebounce().subscribe(obs => {
obs.subscribe(response => {
})
})
你将如何实现 debounceObservable() 函数?(以类似于 Promise 的方式)
澄清 1:我找到了 Observable.debounce() 函数,但这会去抖观察者而不是可观察者本身。我想去抖动可观察
澄清2:我把debounce放在了服务端,因为它是一个单例,而且他们是多个调用者。如果我把它放在调用方,每个调用方都会有一个不同的去抖动计时器。
编辑:这是我试图解释我的问题的片段。只需单击不同的按钮即可查看不同的行为(更多解释在 js 代码注释中)。
Observable.debounce 展示了 RxJs 中的 .debounce() 是如何工作的。它只输出“3”,但我想要“1”、“2”、“3”。
Observable.debounce x3 显示了如果我调用代码 3 次而不用去抖动包装我的整个函数会发生什么。
Observable wrapped x3 显示了我想要获取的内容。我的整个函数都被包装了,但是如果你看代码,订阅部分很挑剔。
Promise x3 展示了使用 Promises 是多么简单。
let log = (logValue) => {
const list = document.querySelector('#logs');
const li = document.createElement('li');
li.innerHTML = logValue;
list.appendChild(li);
}
/* ************************ */
/* WITH OBSERVABLE.DEBOUNCE */
/* ************************ */
let doStuffObservable = () => {
Rx.Observable.create((observer) => {
log('this should be called only one time (observable.debounce)');
setTimeout(() => {
observer.next('observable.debounce 1');
observer.next('observable.debounce 2');
observer.next('observable.debounce 3');
}, 1000);
})
.debounce(500)
.subscribe((response) => {
log(response);
}, (error) => {
log(error);
});
}
/* *********************************** */
/* WITH OBSERVABLE WRAPPED IN DEBOUNCE */
/* *********************************** */
let doStuffObservable2 = (param) => {
return Rx.Observable.create((observer) => {
log('this should be called only one time (observable wrapped)');
setTimeout(() => {
observer.next('observable wrapped ' + param);
}, 1000);
})
}
let debounceObservable = (callback, delay, immediate) => {
let timeout;
return function () {
let context = this, args = arguments;
return Rx.Observable.create((observer) => {
let later = function () {
timeout = null;
if(!immediate) {
observer.next(callback.apply(context, args));
}
};
let callNow = immediate && !timeout;
clearTimeout(timeout);
timeout = setTimeout(later, delay);
if(callNow) {
observer.next(callback.apply(context, args));
}
});
}
}
let doStuffObservable2Debounced = debounceObservable(doStuffObservable2);
/* ************* */
/* WITH PROMISES */
/* ************* */
let doStuffPromise = (param) => {
return new Promise((resolve, reject) => {
log('this should be called only one time (promise)');
setTimeout(() => {
resolve('promise ' + param);
}, 1000);
});
}
let debouncePromise = (callback, delay, immediate) => {
let timeout;
return function () {
let context = this, args = arguments;
return new Promise(function (resolve) {
let later = function () {
timeout = null;
if (!immediate) {
resolve(callback.apply(context, args));
}
};
let callNow = immediate && !timeout;
clearTimeout(timeout);
timeout = setTimeout(later, delay);
if (callNow) {
resolve(callback.apply(context, args));
}
});
}
}
/* ******* */
/* SAMPLES */
/* ******* */
function doObservableDebounce() {
doStuffObservable();
// result :
// this should be called only one time (observable.debounce)
// observable.debounce 3
// this is not what i want, i want all three values in output
}
function doObservableDebounce3Times() {
doStuffObservable();
doStuffObservable();
doStuffObservable();
// result :
// this should be called only one time (observable.debounce)
// this should be called only one time (observable.debounce)
// this should be called only one time (observable.debounce)
// observable.debounce 3
// observable.debounce 3
// observable.debounce 3
// this is bad
}
function doObservableWrappedDebounce3Times() {
doStuffObservable2Debounced(1)
.subscribe((response) => {
log(response);
response.subscribe((response2) => {
log(response2);
}, (error) => {
log(error);
})
}, (error) => {
log(error);
});
doStuffObservable2Debounced(2)
.subscribe((response) => {
log(response);
response.subscribe((response2) => {
log(response2);
}, (error) => {
log(error);
})
}, (error) => {
log(error);
});
doStuffObservable2Debounced(3)
.subscribe((response) => {
log(response);
response.subscribe((response2) => {
log(response2);
}, (error) => {
log(error);
})
}, (error) => {
log(error);
});
// result :
// AnonymousObservable { source: undefined, __subscribe: [Function] }
// this should be called only one time (observable wrapped)
// observable wrapped 3
// this is good but there are 2 embedded subscribe
}
function doPromiseDebounce3Times() {
let doStuffPromiseDebounced = debouncePromise(doStuffPromise);
doStuffPromiseDebounced(1).then(response => {
log(response);
})
doStuffPromiseDebounced(2).then(response => {
log(response);
})
doStuffPromiseDebounced(3).then(response => {
log(response);
})
// result :
// this should be called only one time (promise)
// promise 3
// this is perfect
}
<!DOCTYPE html>
<html>
<head>
<script data-require="rxjs@4.0.6" data-semver="4.0.6" src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>
</head>
<body>
<button onclick='doObservableDebounce()'>Observable.debounce</button>
<button onclick='doObservableDebounce3Times()'>Observable.debounce x3</button>
<button onclick='doObservableWrappedDebounce3Times()'>Observable wrapped x3</button>
<button onclick='doPromiseDebounce3Times()'>Promise x3</button>
<ul id="logs"></ul>
</body>
</html>
好的,我想我找到了办法。我应该做的是替换 :
observer.next(callback.apply(context, args));
来自
callback.apply(context, args).subscribe((response) => {
observer.next(response)
}, (error) => {
observer.error(error);
});
终于可以像经典的 observable 一样使用了:
debouncedObservable(1)
.subscribe((response) => {
log(response);
}, (error) => {
log(error);
});
这是实现的片段:
let log = (logValue) => {
const list = document.querySelector('#logs');
const li = document.createElement('li');
li.innerHTML = logValue;
list.appendChild(li);
}
/* *********************************** */
/* WITH OBSERVABLE WRAPPED IN DEBOUNCE */
/* *********************************** */
let doStuffObservable = (param) => {
return Rx.Observable.create((observer) => {
log('this should be called only one time (observable wrapped)');
setTimeout(() => {
observer.next('observable wrapped ' + param);
}, 1000);
})
}
let debounceObservable = (callback, delay, immediate) => {
let timeout;
return function () {
let context = this, args = arguments;
return Rx.Observable.create((observer) => {
let later = function () {
timeout = null;
if(!immediate) {
callback.apply(context, args).subscribe((response) => {
observer.next(response)
}, (error) => {
observer.error(error);
});
}
};
let callNow = immediate && !timeout;
clearTimeout(timeout);
timeout = setTimeout(later, delay);
if(callNow) {
callback.apply(context, args).subscribe((response) => {
observer.next(response)
}, (error) => {
observer.error(error);
});
}
});
}
}
let doStuffObservable2Debounced = debounceObservable(doStuffObservable);
/* ******* */
/* SAMPLES */
/* ******* */
function doObservableWrappedDebounce3Times() {
doStuffObservable2Debounced(1)
.subscribe((response) => {
log(response);
}, (error) => {
log(error);
});
doStuffObservable2Debounced(2)
.subscribe((response) => {
log(response);
}, (error) => {
log(error);
});
doStuffObservable2Debounced(3)
.subscribe((response) => {
log(response);
}, (error) => {
log(error);
});
}
<!DOCTYPE html>
<html>
<head>
<script data-require="rxjs@4.0.6" data-semver="4.0.6" src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>
</head>
<body>
<button onclick='doObservableWrappedDebounce3Times()'>Observable wrapped x3</button>
<ul id="logs"></ul>
</body>
</html>
如果您认为我遗漏了什么,请发表评论。
抱歉,我没有收到您对我的评论的回复通知。
针对此问题的更清晰的仅 Rx 解决方案是将您的服务调用视为事件流,如下所示:
constructor() {
this._genericServiceCall$ = new ReplaySubject(1);
this._genericServiceResult$ = this._genericServiceCall$
.asObservable()
.debounceTime(1000)
.switchMap(params => this._genericService(params));
}
private _genericService(params) {
//Do some stuff
//...
return this.http.post('http://foo.com', params)
.map((response) => {
//Do some generic stuff
//...
return someData;
})
.catch((error: any) => {
//Manage error in a generic way + do some generic stuff
//...
return Observable.throw(error);
});
}
public genericService(params) {
this._genericServiceCall$.next(params);
return this._genericServiceResult$; // Optionally add `.take(1)` so the observer has the expected behaviour of only getting 1 answer back
}
虽然我看到了一些东西......你会接受哪个 params
作为必须通过私人 _genericService
的那些?
无论如何,你了解这里发生的事情吗?因此,每次有人调用 genericService()
时,它不会立即调用该服务 - 相反,它会发出新的 _genericServiceCall$
和 return _genericServiceResult$
流。如果我们看一下这个流是如何定义的,我们会发现它需要一个去抖动的 _genericServiceCall$
然后将它映射到服务调用。理论上它应该可以工作 - 没试过。
编辑:现在我明白了 - 您可能需要发布 genericServiceResult 以使其成为热可观察对象,否则一旦任何观察者订阅它,它就会 return:
constructor() {
this._genericServiceCall$ = new ReplaySubject(1);
this._genericServiceResult$ = this._genericServiceCall$
.asObservable()
.debounceTime(1000)
.switchMap(params => this._genericService(params))
.publish();
const subscription = this._genericServiceResult$.connect();
// You must store subscription somewhere and dispose it when this object is destroyed - If it's a singleton service this might not be needed.
}
在我的应用程序中,我有一个服务 return 是一个像这样的可观察对象:
public genericService(params) {
//Do some stuff
//...
return this.http.post('http://foo.com', params)
.map((response) => {
//Do some generic stuff
//...
return someData;
})
.catch((error: any) => {
//Manage error in a generic way + do some generic stuff
//...
return Observable.throw(error);
});
}
let debouncePointer = debounceObservable(genericService, 200);
public genericServiceDebounce(params) {
return debouncePointer(params);
}
现在在另一个地方,我想这样调用我的函数
genericServiceDebounce(params)
.subscribe((response) => {
//Do some non-generic stuff
}, (error) => {
//Manage error in a non-generic way + do some non-generic stuff
});
但是我没有成功实现debounceObservable()函数
我尝试了这个基于 Promise 等价物的实现(https://github.com/moszeed/es6-promise-debounce/blob/master/src/es6-promise-debounce.js):
debounceObservable(callback, delay, immediate?) {
let timeout;
return function () {
let context = this, args = arguments;
return Observable.create((observer) => {
let later = function () {
timeout = null;
if(!immediate) {
observer.next(callback.apply(context, args));
//observer.onCompleted(); // don't know if this is needed
}
};
let callNow = immediate && !timeout;
clearTimeout(timeout);
timeout = setTimeout(later, delay);
if(callNow) {
observer.next(callback.apply(context, args));
//observer.onCompleted(); // don't know if this is needed
}
});
}
}
但这并没有像预期的那样工作。 使用 Promises 时,returning resolve(anotherPromise) 允许您调用:
genericServiceDebounce().then(response => {
})
当使用 Observables 时,returning observer.next(anotherObservable) return 一个嵌入的 observable,这意味着你应该调用 :
genericServiceDebounce().subscribe(obs => {
obs.subscribe(response => {
})
})
你将如何实现 debounceObservable() 函数?(以类似于 Promise 的方式)
澄清 1:我找到了 Observable.debounce() 函数,但这会去抖观察者而不是可观察者本身。我想去抖动可观察
澄清2:我把debounce放在了服务端,因为它是一个单例,而且他们是多个调用者。如果我把它放在调用方,每个调用方都会有一个不同的去抖动计时器。
编辑:这是我试图解释我的问题的片段。只需单击不同的按钮即可查看不同的行为(更多解释在 js 代码注释中)。
Observable.debounce 展示了 RxJs 中的 .debounce() 是如何工作的。它只输出“3”,但我想要“1”、“2”、“3”。
Observable.debounce x3 显示了如果我调用代码 3 次而不用去抖动包装我的整个函数会发生什么。
Observable wrapped x3 显示了我想要获取的内容。我的整个函数都被包装了,但是如果你看代码,订阅部分很挑剔。
Promise x3 展示了使用 Promises 是多么简单。
let log = (logValue) => {
const list = document.querySelector('#logs');
const li = document.createElement('li');
li.innerHTML = logValue;
list.appendChild(li);
}
/* ************************ */
/* WITH OBSERVABLE.DEBOUNCE */
/* ************************ */
let doStuffObservable = () => {
Rx.Observable.create((observer) => {
log('this should be called only one time (observable.debounce)');
setTimeout(() => {
observer.next('observable.debounce 1');
observer.next('observable.debounce 2');
observer.next('observable.debounce 3');
}, 1000);
})
.debounce(500)
.subscribe((response) => {
log(response);
}, (error) => {
log(error);
});
}
/* *********************************** */
/* WITH OBSERVABLE WRAPPED IN DEBOUNCE */
/* *********************************** */
let doStuffObservable2 = (param) => {
return Rx.Observable.create((observer) => {
log('this should be called only one time (observable wrapped)');
setTimeout(() => {
observer.next('observable wrapped ' + param);
}, 1000);
})
}
let debounceObservable = (callback, delay, immediate) => {
let timeout;
return function () {
let context = this, args = arguments;
return Rx.Observable.create((observer) => {
let later = function () {
timeout = null;
if(!immediate) {
observer.next(callback.apply(context, args));
}
};
let callNow = immediate && !timeout;
clearTimeout(timeout);
timeout = setTimeout(later, delay);
if(callNow) {
observer.next(callback.apply(context, args));
}
});
}
}
let doStuffObservable2Debounced = debounceObservable(doStuffObservable2);
/* ************* */
/* WITH PROMISES */
/* ************* */
let doStuffPromise = (param) => {
return new Promise((resolve, reject) => {
log('this should be called only one time (promise)');
setTimeout(() => {
resolve('promise ' + param);
}, 1000);
});
}
let debouncePromise = (callback, delay, immediate) => {
let timeout;
return function () {
let context = this, args = arguments;
return new Promise(function (resolve) {
let later = function () {
timeout = null;
if (!immediate) {
resolve(callback.apply(context, args));
}
};
let callNow = immediate && !timeout;
clearTimeout(timeout);
timeout = setTimeout(later, delay);
if (callNow) {
resolve(callback.apply(context, args));
}
});
}
}
/* ******* */
/* SAMPLES */
/* ******* */
function doObservableDebounce() {
doStuffObservable();
// result :
// this should be called only one time (observable.debounce)
// observable.debounce 3
// this is not what i want, i want all three values in output
}
function doObservableDebounce3Times() {
doStuffObservable();
doStuffObservable();
doStuffObservable();
// result :
// this should be called only one time (observable.debounce)
// this should be called only one time (observable.debounce)
// this should be called only one time (observable.debounce)
// observable.debounce 3
// observable.debounce 3
// observable.debounce 3
// this is bad
}
function doObservableWrappedDebounce3Times() {
doStuffObservable2Debounced(1)
.subscribe((response) => {
log(response);
response.subscribe((response2) => {
log(response2);
}, (error) => {
log(error);
})
}, (error) => {
log(error);
});
doStuffObservable2Debounced(2)
.subscribe((response) => {
log(response);
response.subscribe((response2) => {
log(response2);
}, (error) => {
log(error);
})
}, (error) => {
log(error);
});
doStuffObservable2Debounced(3)
.subscribe((response) => {
log(response);
response.subscribe((response2) => {
log(response2);
}, (error) => {
log(error);
})
}, (error) => {
log(error);
});
// result :
// AnonymousObservable { source: undefined, __subscribe: [Function] }
// this should be called only one time (observable wrapped)
// observable wrapped 3
// this is good but there are 2 embedded subscribe
}
function doPromiseDebounce3Times() {
let doStuffPromiseDebounced = debouncePromise(doStuffPromise);
doStuffPromiseDebounced(1).then(response => {
log(response);
})
doStuffPromiseDebounced(2).then(response => {
log(response);
})
doStuffPromiseDebounced(3).then(response => {
log(response);
})
// result :
// this should be called only one time (promise)
// promise 3
// this is perfect
}
<!DOCTYPE html>
<html>
<head>
<script data-require="rxjs@4.0.6" data-semver="4.0.6" src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>
</head>
<body>
<button onclick='doObservableDebounce()'>Observable.debounce</button>
<button onclick='doObservableDebounce3Times()'>Observable.debounce x3</button>
<button onclick='doObservableWrappedDebounce3Times()'>Observable wrapped x3</button>
<button onclick='doPromiseDebounce3Times()'>Promise x3</button>
<ul id="logs"></ul>
</body>
</html>
好的,我想我找到了办法。我应该做的是替换 :
observer.next(callback.apply(context, args));
来自
callback.apply(context, args).subscribe((response) => {
observer.next(response)
}, (error) => {
observer.error(error);
});
终于可以像经典的 observable 一样使用了:
debouncedObservable(1)
.subscribe((response) => {
log(response);
}, (error) => {
log(error);
});
这是实现的片段:
let log = (logValue) => {
const list = document.querySelector('#logs');
const li = document.createElement('li');
li.innerHTML = logValue;
list.appendChild(li);
}
/* *********************************** */
/* WITH OBSERVABLE WRAPPED IN DEBOUNCE */
/* *********************************** */
let doStuffObservable = (param) => {
return Rx.Observable.create((observer) => {
log('this should be called only one time (observable wrapped)');
setTimeout(() => {
observer.next('observable wrapped ' + param);
}, 1000);
})
}
let debounceObservable = (callback, delay, immediate) => {
let timeout;
return function () {
let context = this, args = arguments;
return Rx.Observable.create((observer) => {
let later = function () {
timeout = null;
if(!immediate) {
callback.apply(context, args).subscribe((response) => {
observer.next(response)
}, (error) => {
observer.error(error);
});
}
};
let callNow = immediate && !timeout;
clearTimeout(timeout);
timeout = setTimeout(later, delay);
if(callNow) {
callback.apply(context, args).subscribe((response) => {
observer.next(response)
}, (error) => {
observer.error(error);
});
}
});
}
}
let doStuffObservable2Debounced = debounceObservable(doStuffObservable);
/* ******* */
/* SAMPLES */
/* ******* */
function doObservableWrappedDebounce3Times() {
doStuffObservable2Debounced(1)
.subscribe((response) => {
log(response);
}, (error) => {
log(error);
});
doStuffObservable2Debounced(2)
.subscribe((response) => {
log(response);
}, (error) => {
log(error);
});
doStuffObservable2Debounced(3)
.subscribe((response) => {
log(response);
}, (error) => {
log(error);
});
}
<!DOCTYPE html>
<html>
<head>
<script data-require="rxjs@4.0.6" data-semver="4.0.6" src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>
</head>
<body>
<button onclick='doObservableWrappedDebounce3Times()'>Observable wrapped x3</button>
<ul id="logs"></ul>
</body>
</html>
如果您认为我遗漏了什么,请发表评论。
抱歉,我没有收到您对我的评论的回复通知。
针对此问题的更清晰的仅 Rx 解决方案是将您的服务调用视为事件流,如下所示:
constructor() {
this._genericServiceCall$ = new ReplaySubject(1);
this._genericServiceResult$ = this._genericServiceCall$
.asObservable()
.debounceTime(1000)
.switchMap(params => this._genericService(params));
}
private _genericService(params) {
//Do some stuff
//...
return this.http.post('http://foo.com', params)
.map((response) => {
//Do some generic stuff
//...
return someData;
})
.catch((error: any) => {
//Manage error in a generic way + do some generic stuff
//...
return Observable.throw(error);
});
}
public genericService(params) {
this._genericServiceCall$.next(params);
return this._genericServiceResult$; // Optionally add `.take(1)` so the observer has the expected behaviour of only getting 1 answer back
}
虽然我看到了一些东西......你会接受哪个 params
作为必须通过私人 _genericService
的那些?
无论如何,你了解这里发生的事情吗?因此,每次有人调用 genericService()
时,它不会立即调用该服务 - 相反,它会发出新的 _genericServiceCall$
和 return _genericServiceResult$
流。如果我们看一下这个流是如何定义的,我们会发现它需要一个去抖动的 _genericServiceCall$
然后将它映射到服务调用。理论上它应该可以工作 - 没试过。
编辑:现在我明白了 - 您可能需要发布 genericServiceResult 以使其成为热可观察对象,否则一旦任何观察者订阅它,它就会 return:
constructor() {
this._genericServiceCall$ = new ReplaySubject(1);
this._genericServiceResult$ = this._genericServiceCall$
.asObservable()
.debounceTime(1000)
.switchMap(params => this._genericService(params))
.publish();
const subscription = this._genericServiceResult$.connect();
// You must store subscription somewhere and dispose it when this object is destroyed - If it's a singleton service this might not be needed.
}