立即交付第一个项目,'debounce' 个后续项目
Deliver the first item immediately, 'debounce' following items
考虑以下用例:
- 需要尽快交付第一件商品
- 需要 去抖动 跟随 1 秒超时的事件
我最终实现了基于 OperatorDebounceWithTime
的自定义运算符,然后像这样使用它
.lift(new CustomOperatorDebounceWithTime<>(1, TimeUnit.SECONDS, Schedulers.computation()))
CustomOperatorDebounceWithTime
立即交付第一个项目,然后使用 OperatorDebounceWithTime
运算符的逻辑对后面的项目 去抖动 。
是否有更简单的方法来实现所描述的行为?让我们跳过 compose
运算符,它不能解决问题。我正在寻找一种无需实现自定义运算符即可实现此目的的方法。
更新:
根据@lopar 的评论,更好的方法是:
Observable.from(items).publish(publishedItems -> publishedItems.limit(1).concatWith(publishedItems.skip(1).debounce(1, TimeUnit.SECONDS)))
像这样的东西行得通吗:
String[] items = {"one", "two", "three", "four", "five", "six", "seven", "eight"};
Observable<String> myObservable = Observable.from(items);
Observable.concat(myObservable.first(), myObservable.skip(1).debounce(1, TimeUnit.SECONDS))
.subscribe(s -> System.out.println(s));
@LortRaydenMK 和@lopar 的答案是最好的,但我想提出一些其他建议,以防它碰巧对你或处于类似情况的人更有效。
有一个 debounce()
的变体,它采用一个函数来决定该特定项目的去抖时间。它通过 returning 一个在一段时间后完成的可观察对象来指定这一点。您的函数可以 return empty()
用于第一项,而 timer()
用于其余项。像(未经测试):
String[] items = {"one", "two", "three", "four", "five", "six"};
Observable.from(items)
.debounce(item -> item.equals("one")
? Observable.empty()
: Observable.timer(1, TimeUnit.SECONDS));
诀窍是这个函数必须知道哪个项目是第一个。您的序列可能知道这一点。如果没有,您可能需要 zip()
和 range()
或其他。在这种情况下最好使用其他答案中的解决方案。
一个使用 RxJava 2.0 的简单解决方案,翻译自 ,它结合了 throttleFirst 和 debounce,然后删除重复项。
private <T> ObservableTransformer<T, T> debounceImmediate() {
return observable -> observable.publish(p ->
Observable.merge(p.throttleFirst(1, TimeUnit.SECONDS),
p.debounce(1, TimeUnit.SECONDS)).distinctUntilChanged());
}
@Test
public void testDebounceImmediate() {
Observable.just(0, 100, 200, 1500, 1600, 1800, 2000, 10000)
.flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS).map(w -> v))
.doOnNext(v -> System.out.println(LocalDateTime.now() + " T=" + v))
.compose(debounceImmediate())
.blockingSubscribe(v -> System.out.println(LocalDateTime.now() + " Debounced: " + v));
}
使用 limit() 或 take() 的方法似乎无法处理长期存在的数据流,我可能想要持续观察,但仍会立即对一段时间内看到的第一个事件采取行动。
使用带有函数的debounce
版本并以这种方式实现函数:
.debounce(new Func1<String, Observable<String>>() {
private AtomicBoolean isFirstEmission = new AtomicBoolean(true);
@Override
public Observable<String> call(String s) {
// note: standard debounce causes the first item to be
// delayed by 1 second unnecessarily, this is a workaround
if (isFirstEmission.getAndSet(false)) {
return Observable.just(s);
} else {
return Observable.just(s).delay(1, TimeUnit.SECONDS);
}
}
})
第一项立即发出。后续项目延迟一秒钟。如果一个延迟的可观察对象在下一个项目到达之前没有终止,它就会被取消,所以预期的去抖动行为得到满足。
有一个问题:你总是丢失第二个项目。我想之前没有人意识到这一点,因为如果你有去抖动,你通常会有很多事件,而第二个事件无论如何都会随着去抖动而被丢弃。永不输球的正确方法是:
observable
.publish(published ->
published
.limit(1)
.concatWith(published.debounce(1, TimeUnit.SECONDS)));
别担心,您不会得到任何重复的事件。如果您不确定,可以 运行 此代码并自行检查:
Observable.just(1, 2, 3, 4)
.publish(published ->
published
.limit(1)
.concatWith(published))
.subscribe(System.out::println);
Ngrx - rxjs 解决方案,将管道一分为二
onMyAction$ = this.actions$
.pipe(ofType<any>(ActionTypes.MY_ACTION);
lastTime = new Date();
@Effect()
onMyActionWithAbort$ = this.onMyAction$
.pipe(
filter((data) => {
const result = new Date() - this.lastTime > 200;
this.lastTime = new Date();
return result;
}),
switchMap(this.DoTheJob.bind(this))
);
@Effect()
onMyActionWithDebounce$ = this.onMyAction$
.pipe(
debounceTime(200),
filter(this.preventDuplicateFilter.bind(this)),
switchMap(this.DoTheJob.bind(this))
);
基于@lopar 评论的 Kotlin 扩展函数:
fun <T> Flowable<T>.debounceImmediate(timeout: Long, unit: TimeUnit): Flowable<T> {
return publish {
it.take(1).concatWith(it.debounce(timeout, unit))
}
}
fun <T> Observable<T>.debounceImmediate(timeout: Long, unit: TimeUnit): Observable<T> {
return publish {
it.take(1).concatWith(it.debounce(timeout, unit))
}
}
防止双重订阅
使用这个:
const debouncedSkipFirstStream$ = stream$.pipe(
map((it, index) => ({ it, index })),
debounce(({ index }) => (
index ? new Promise(res => setTimeout(res, TimeUnit.SECONDS))
: Rx.of(true))),
map(({ it }) => it),
);
如果使用拆分解决方案,您将看到 'run' 打印两次
x = rxjs.Observable.create(o=>{
console.info('run');
o.next(1);
o.next(2);
});
a = x.pipe(rxjs.operators.take(1));
b = x.pipe(rxjs.operators.skip(1), rxjs.operators.debounceTime(60));
rxjs.concat(a, b).subscribe(console.log);
我的 Dart 解决方案:
extension StreamExt<T> on Stream<T> {
Stream<T> immediateDebounce(Duration duration) {
var lastEmit = 0;
return debounce((event) {
if (_now - lastEmit < duration.inMilliseconds) {
lastEmit = _now;
return Stream.value(event).delay(duration);
} else {
lastEmit = _now;
return Stream.value(event);
}
});
}
}
int get _now => DateTime.now().millisecondsSinceEpoch;
我和
一起去了
Flowable.concat(
flowable // emits immediately
.take(1)
.skipWhile { it.isEmpty() },
flowable // same flowable, but emits with delay and debounce
.debounce(2, TimeUnit.SECONDS)
)
.distinctUntilChanged()
如果有人在 2021 年寻找这个:
@OptIn(FlowPreview::class)
fun <T> Flow<T>.debounceImmediate(timeMillis: Long): Flow<T> =
withIndex()
.onEach { if (it.index != 0) delay(timeMillis) }
.map { it.value }
用法:
authRepository.login(loginDto)
.debounceImmediate(10000)
阅读 this article 后,我最终使用 throttleLatest
运算符获得与我正在寻找的即时去抖非常相似的行为。
以下代码将立即发出第一个项目,然后每 500 毫秒检查一次新项目。只有在 500 毫秒内收到的最新事件 window 才会被发送出去。
observable.throttleLatest(500, TimeUnit.MILLISECONDS)
view.clicks()
.throttleFirst(2, TimeUnit.SECONDS)
.subscribe {
println("Clicked button")
}
我发现这是最简单的方法。 clicks() 来自 rx 视图绑定。添加此依赖项以获取可观察的视图
implementation 'com.jakewharton.rxbinding4:rxbinding:4.0.0
对于那些试图使用 Kotlin Flow 解决同样问题的人:
fun <T> Flow<T>.throttleFirst(timeout: Duration): Flow<T> {
var job = Job().apply { complete() }
return onCompletion { job.cancel() }.run {
flow {
coroutineScope {
collect { value ->
if (!job.isActive) {
emit(value)
job = launch { delay(timeout.inWholeMilliseconds) }
}
}
}
}
}
}
示例:
flow {
emit(1)
delay(90)
emit(2)
delay(90)
emit(3)
delay(1010)
emit(4)
delay(1010)
emit(5)
}.throttleFirst(1.seconds).collect { ... }
// 1, 4, 5
考虑以下用例:
- 需要尽快交付第一件商品
- 需要 去抖动 跟随 1 秒超时的事件
我最终实现了基于 OperatorDebounceWithTime
的自定义运算符,然后像这样使用它
.lift(new CustomOperatorDebounceWithTime<>(1, TimeUnit.SECONDS, Schedulers.computation()))
CustomOperatorDebounceWithTime
立即交付第一个项目,然后使用 OperatorDebounceWithTime
运算符的逻辑对后面的项目 去抖动 。
是否有更简单的方法来实现所描述的行为?让我们跳过 compose
运算符,它不能解决问题。我正在寻找一种无需实现自定义运算符即可实现此目的的方法。
更新:
根据@lopar 的评论,更好的方法是:
Observable.from(items).publish(publishedItems -> publishedItems.limit(1).concatWith(publishedItems.skip(1).debounce(1, TimeUnit.SECONDS)))
像这样的东西行得通吗:
String[] items = {"one", "two", "three", "four", "five", "six", "seven", "eight"};
Observable<String> myObservable = Observable.from(items);
Observable.concat(myObservable.first(), myObservable.skip(1).debounce(1, TimeUnit.SECONDS))
.subscribe(s -> System.out.println(s));
@LortRaydenMK 和@lopar 的答案是最好的,但我想提出一些其他建议,以防它碰巧对你或处于类似情况的人更有效。
有一个 debounce()
的变体,它采用一个函数来决定该特定项目的去抖时间。它通过 returning 一个在一段时间后完成的可观察对象来指定这一点。您的函数可以 return empty()
用于第一项,而 timer()
用于其余项。像(未经测试):
String[] items = {"one", "two", "three", "four", "five", "six"};
Observable.from(items)
.debounce(item -> item.equals("one")
? Observable.empty()
: Observable.timer(1, TimeUnit.SECONDS));
诀窍是这个函数必须知道哪个项目是第一个。您的序列可能知道这一点。如果没有,您可能需要 zip()
和 range()
或其他。在这种情况下最好使用其他答案中的解决方案。
一个使用 RxJava 2.0 的简单解决方案,翻译自
private <T> ObservableTransformer<T, T> debounceImmediate() {
return observable -> observable.publish(p ->
Observable.merge(p.throttleFirst(1, TimeUnit.SECONDS),
p.debounce(1, TimeUnit.SECONDS)).distinctUntilChanged());
}
@Test
public void testDebounceImmediate() {
Observable.just(0, 100, 200, 1500, 1600, 1800, 2000, 10000)
.flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS).map(w -> v))
.doOnNext(v -> System.out.println(LocalDateTime.now() + " T=" + v))
.compose(debounceImmediate())
.blockingSubscribe(v -> System.out.println(LocalDateTime.now() + " Debounced: " + v));
}
使用 limit() 或 take() 的方法似乎无法处理长期存在的数据流,我可能想要持续观察,但仍会立即对一段时间内看到的第一个事件采取行动。
使用带有函数的debounce
版本并以这种方式实现函数:
.debounce(new Func1<String, Observable<String>>() {
private AtomicBoolean isFirstEmission = new AtomicBoolean(true);
@Override
public Observable<String> call(String s) {
// note: standard debounce causes the first item to be
// delayed by 1 second unnecessarily, this is a workaround
if (isFirstEmission.getAndSet(false)) {
return Observable.just(s);
} else {
return Observable.just(s).delay(1, TimeUnit.SECONDS);
}
}
})
第一项立即发出。后续项目延迟一秒钟。如果一个延迟的可观察对象在下一个项目到达之前没有终止,它就会被取消,所以预期的去抖动行为得到满足。
observable
.publish(published ->
published
.limit(1)
.concatWith(published.debounce(1, TimeUnit.SECONDS)));
别担心,您不会得到任何重复的事件。如果您不确定,可以 运行 此代码并自行检查:
Observable.just(1, 2, 3, 4)
.publish(published ->
published
.limit(1)
.concatWith(published))
.subscribe(System.out::println);
Ngrx - rxjs 解决方案,将管道一分为二
onMyAction$ = this.actions$
.pipe(ofType<any>(ActionTypes.MY_ACTION);
lastTime = new Date();
@Effect()
onMyActionWithAbort$ = this.onMyAction$
.pipe(
filter((data) => {
const result = new Date() - this.lastTime > 200;
this.lastTime = new Date();
return result;
}),
switchMap(this.DoTheJob.bind(this))
);
@Effect()
onMyActionWithDebounce$ = this.onMyAction$
.pipe(
debounceTime(200),
filter(this.preventDuplicateFilter.bind(this)),
switchMap(this.DoTheJob.bind(this))
);
基于@lopar 评论的 Kotlin 扩展函数:
fun <T> Flowable<T>.debounceImmediate(timeout: Long, unit: TimeUnit): Flowable<T> {
return publish {
it.take(1).concatWith(it.debounce(timeout, unit))
}
}
fun <T> Observable<T>.debounceImmediate(timeout: Long, unit: TimeUnit): Observable<T> {
return publish {
it.take(1).concatWith(it.debounce(timeout, unit))
}
}
防止双重订阅 使用这个:
const debouncedSkipFirstStream$ = stream$.pipe(
map((it, index) => ({ it, index })),
debounce(({ index }) => (
index ? new Promise(res => setTimeout(res, TimeUnit.SECONDS))
: Rx.of(true))),
map(({ it }) => it),
);
如果使用拆分解决方案,您将看到 'run' 打印两次
x = rxjs.Observable.create(o=>{
console.info('run');
o.next(1);
o.next(2);
});
a = x.pipe(rxjs.operators.take(1));
b = x.pipe(rxjs.operators.skip(1), rxjs.operators.debounceTime(60));
rxjs.concat(a, b).subscribe(console.log);
我的 Dart 解决方案:
extension StreamExt<T> on Stream<T> {
Stream<T> immediateDebounce(Duration duration) {
var lastEmit = 0;
return debounce((event) {
if (_now - lastEmit < duration.inMilliseconds) {
lastEmit = _now;
return Stream.value(event).delay(duration);
} else {
lastEmit = _now;
return Stream.value(event);
}
});
}
}
int get _now => DateTime.now().millisecondsSinceEpoch;
我和
一起去了Flowable.concat(
flowable // emits immediately
.take(1)
.skipWhile { it.isEmpty() },
flowable // same flowable, but emits with delay and debounce
.debounce(2, TimeUnit.SECONDS)
)
.distinctUntilChanged()
如果有人在 2021 年寻找这个:
@OptIn(FlowPreview::class)
fun <T> Flow<T>.debounceImmediate(timeMillis: Long): Flow<T> =
withIndex()
.onEach { if (it.index != 0) delay(timeMillis) }
.map { it.value }
用法:
authRepository.login(loginDto)
.debounceImmediate(10000)
阅读 this article 后,我最终使用 throttleLatest
运算符获得与我正在寻找的即时去抖非常相似的行为。
以下代码将立即发出第一个项目,然后每 500 毫秒检查一次新项目。只有在 500 毫秒内收到的最新事件 window 才会被发送出去。
observable.throttleLatest(500, TimeUnit.MILLISECONDS)
view.clicks()
.throttleFirst(2, TimeUnit.SECONDS)
.subscribe {
println("Clicked button")
}
我发现这是最简单的方法。 clicks() 来自 rx 视图绑定。添加此依赖项以获取可观察的视图
implementation 'com.jakewharton.rxbinding4:rxbinding:4.0.0
对于那些试图使用 Kotlin Flow 解决同样问题的人:
fun <T> Flow<T>.throttleFirst(timeout: Duration): Flow<T> {
var job = Job().apply { complete() }
return onCompletion { job.cancel() }.run {
flow {
coroutineScope {
collect { value ->
if (!job.isActive) {
emit(value)
job = launch { delay(timeout.inWholeMilliseconds) }
}
}
}
}
}
}
示例:
flow {
emit(1)
delay(90)
emit(2)
delay(90)
emit(3)
delay(1010)
emit(4)
delay(1010)
emit(5)
}.throttleFirst(1.seconds).collect { ... }
// 1, 4, 5