发出的每个列表项的 RxJava 延迟

RxJava delay for each item of list emitted

我正在努力实现一些我认为在 Rx 中会相当简单的东西。

我有一个项目列表,我想让每个项目都延迟发出。

似乎 Rx delay() 运算符只是将所有项目的发射移动了指定的延迟,而不是每个单独的项目。

这是一些测试代码。它将列表中的项目分组。然后每个组在被发射之前应该有一个延迟。

Observable.range(1, 5)
    .groupBy(n -> n % 5)
    .flatMap(g -> g.toList())
    .delay(50, TimeUnit.MILLISECONDS)
    .doOnNext(item -> {
        System.out.println(System.currentTimeMillis() - timeNow);
        System.out.println(item);
        System.out.println(" ");
    }).toList().toBlocking().first();

结果是:

154ms
[5]

155ms
[2]

155ms
[1]

155ms
[3]

155ms
[4]

但我希望看到的是这样的:

174ms
[5]

230ms
[2]

285ms
[1]

345ms
[3]

399ms
[4]

我做错了什么?

我想你想要这个:

Observable.range(1, 5)
            .delay(50, TimeUnit.MILLISECONDS)
            .groupBy(n -> n % 5)
            .flatMap(g -> g.toList())
            .doOnNext(item -> {
                System.out.println(System.currentTimeMillis() - timeNow);
                System.out.println(item);
                System.out.println(" ");
            }).toList().toBlocking().first();

这样它会延迟进入组的号码,而不是将减少的列表延迟 5 秒。

一种方法是使用 zip to combine your observable with an Interval observable 来延迟输出。

Observable.zip(Observable.range(1, 5)
        .groupBy(n -> n % 5)
        .flatMap(g -> g.toList()),
    Observable.interval(50, TimeUnit.MILLISECONDS),
    (obs, timer) -> obs)
    .doOnNext(item -> {
      System.out.println(System.currentTimeMillis() - timeNow);
      System.out.println(item);
      System.out.println(" ");
    }).toList().toBlocking().first();

要延迟每个组,您可以将 flatMap() 更改为 return 延迟发射组的 Observable。

Observable
        .range(1, 5)
        .groupBy(n -> n % 5)
        .flatMap(g ->
                Observable
                        .timer(50, TimeUnit.MILLISECONDS)
                        .flatMap(t -> g.toList())
        )
        .doOnNext(item -> {
            System.out.println(System.currentTimeMillis() - timeNow);
            System.out.println(item);
            System.out.println(" ");
        }).toList().toBlocking().first();

您可以实现 custom rx operator such as MinRegularIntervalDelayOperator,然后将其与 lift 函数一起使用

Observable.range(1, 5)
    .groupBy(n -> n % 5)
    .flatMap(g -> g.toList())
    .lift(new MinRegularIntervalDelayOperator<Integer>(50L))
    .doOnNext(item -> {
      System.out.println(System.currentTimeMillis() - timeNow);
      System.out.println(item);
      System.out.println(" ");
    }).toList().toBlocking().first();

一种不太干净的方法是使用 .delay(Func1) 运算符使延迟随着迭代而变化。

Observable.range(1, 5)
            .delay(n -> n*50)
            .groupBy(n -> n % 5)
            .flatMap(g -> g.toList())
            .doOnNext(item -> {
                System.out.println(System.currentTimeMillis() - timeNow);
                System.out.println(item);
                System.out.println(" ");
            }).toList().toBlocking().first();

还有其他方法可以使用 concatMap 作为源项的 concatMap returns 可观察对象。所以我们可以在那个 observable 上添加延迟。

这是我尝试过的。

Observable.range(1, 5)
          .groupBy(n -> n % 5)
          .concatMap(integerIntegerGroupedObservable ->
          integerIntegerGroupedObservable.delay(2000, TimeUnit.MILLISECONDS))
          .doOnNext(item -> {
                    System.out.println(System.currentTimeMillis() - timeNow);
                    System.out.println(item);
                    System.out.println(" ");
                }).toList().toBlocking().first(); 

您可以使用 flatMap、maxConcurrent 和 delay() 在发出的项目之间添加延迟

这是一个示例 - 发出 0..4 延迟

@Test
fun testEmitWithDelays() {
    val DELAY = 500L
    val COUNT = 5

    val latch = CountDownLatch(1)
    val startMoment = System.currentTimeMillis()
    var endMoment : Long = 0

    Observable
        .range(0, COUNT)
        .flatMap( { Observable.just(it).delay(DELAY, TimeUnit.MILLISECONDS) }, 1) // maxConcurrent = 1
        .subscribe(
                { println("... value: $it, ${System.currentTimeMillis() - startMoment}") },
                {},
                {
                    endMoment = System.currentTimeMillis()
                    latch.countDown()
                })

    latch.await()

    assertTrue { endMoment - startMoment >= DELAY * COUNT }
}

... value: 0, 540
... value: 1, 1042
... value: 2, 1544
... value: 3, 2045
... value: 4, 2547

最简单的方法似乎就是使用 concatMap 并将每个项目包装在延迟的 Obserable 中。

long startTime = System.currentTimeMillis();
Observable.range(1, 5)
        .concatMap(i-> Observable.just(i).delay(50, TimeUnit.MILLISECONDS))
        .doOnNext(i-> System.out.println(
                "Item: " + i + ", Time: " + (System.currentTimeMillis() - startTime) +"ms"))
        .toCompletable().await();

打印:

Item: 1, Time: 51ms
Item: 2, Time: 101ms
Item: 3, Time: 151ms
Item: 4, Time: 202ms
Item: 5, Time: 252ms

只是分享一个简单的方法,以间隔发出集合中的每个项目:

Observable.just(1,2,3,4,5)
    .zipWith(Observable.interval(500, TimeUnit.MILLISECONDS), (item, interval) -> item)
    .subscribe(System.out::println);

每个项目将每 500 毫秒发出一次

您应该可以通过使用 Timer 运算符来实现。我尝试使用 delay 但无法获得所需的输出。注意在 flatmap 运算符中完成的嵌套操作。

    Observable.range(1,5)
            .flatMap(x -> Observable.timer(50 * x, TimeUnit.MILLISECONDS)
                        .map(y -> x))
            // attach timestamp
            .timestamp()
            .subscribe(timedIntegers ->
                    Log.i(TAG, "Timed String: "
                            + timedIntegers.value()
                            + " "
                            + timedIntegers.time()));

我认为这正是您所需要的。看看:

long startTime = System.currentTimeMillis();
Observable.intervalRange(1, 5, 0, 50, TimeUnit.MILLISECONDS)
                .timestamp(TimeUnit.MILLISECONDS)
                .subscribe(emitTime -> {
                    System.out.println(emitTime.time() - startTime);
                });

您可以使用

   Observable.interval(1, TimeUnit.SECONDS)
            .map(new Function<Long, Integer>() {
                @Override
                public Integer apply(Long aLong) throws Exception {
                    return aLong.intValue() + 1;
                }
            })
            .startWith(0)
            .take(listInput.size())
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer index) throws Exception {
                    Log.d(TAG, "---index of your list --" + index);
                }
            });

以上代码不重复值(索引)。 "I'm sure"

对于 kotlin 用户,我为 'zip with interval' 方法编写了一个扩展函数

import io.reactivex.Observable
import io.reactivex.functions.BiFunction
import java.util.concurrent.TimeUnit

fun <T> Observable<T>.delayEach(interval: Long, timeUnit: TimeUnit): Observable<T> =
    Observable.zip(
        this, 
        Observable.interval(interval, timeUnit), 
        BiFunction { item, _ -> item }
    )

它的工作方式相同,但这使得它可以重复使用。示例:

Observable.range(1, 5)
    .delayEach(1, TimeUnit.SECONDS)

在发出的每个项目之间引入延迟很有用:

List<String> letters = new ArrayList<>(Arrays.asList("a", "b", "c", "d"));

Observable.fromIterable(letters)
                .concatMap(item -> Observable.interval(1, TimeUnit.SECONDS)
                        .take(1)
                        .map(second -> item))
                .subscribe(System.out::println);

https://github.com/ReactiveX/RxJava/issues/3505

有更多好的选择
Observable.just("A", "B", "C", "D", "E", "F")
    .flatMap { item -> Thread.sleep(2000)
        Observable.just( item ) }
    .subscribe { println( it ) }

本 post 中建议的两种方法的 Swift 扩展。

连接

import RxSwift

extension Observable {
    public func delayEach(_ dueTime: RxSwift.RxTimeInterval, scheduler: RxSwift.SchedulerType) -> RxSwift.Observable<Element> {
        return self.concatMap { Observable.just([=10=]).delay(dueTime, scheduler: scheduler) }
    }
}

Zip

import RxSwift

extension Observable {
    public func delayEach(_ period: RxSwift.RxTimeInterval, scheduler: RxSwift.SchedulerType) -> RxSwift.Observable<Element> {
        return Observable.zip(
            Observable<Int>.interval(period, scheduler: scheduler),
            self
        ) {  }
    }
}

用法

Observable.range(start: 1, count: 5)
    .delayEach(.seconds(1), scheduler: MainScheduler.instance)

我个人更喜欢 concat 方法,因为当上游以比延迟间隔慢的速度发出项目时,它也会按预期工作。

是的,原来的 post 是特定于 RxJava 的,但是 Google 也将您带到这里进行 RxSwift 查询。

关于 eis 的评论“想知道为什么没有任何答案真正回答问题。为什么这不起作用,有什么问题?”:

它的行为与预期的不同,因为延迟一个项目意味着它的发射时间被延迟相对于项目将被发射的时间——而不是相对于前一个项目。

想象一下 OP 的可观察性 没有 任何延迟:所有项目都快速连续发出(在同一毫秒内)。 具有 延迟,每个项目都会稍后发出。但由于对每个项目应用了相同的延迟,因此它们的相对发射时间不会改变。它们仍然在一毫秒内发出。

想象一个人在 14:00 进入房间。另一个人在 14:01 处进入。 如果您对两者应用一小时的延迟,它们将在 15:00 和 15:01 处进入。他们之间只差一分钟。