在 RxJava 中每次延迟完成后执行操作
perform operation after each delay is complete in RxJava
我正在尝试模拟以特定顺序发射物品时的延迟
这里我模拟一下问题
List<Integer> integers = new ArrayList<>();
integers.add(1);
integers.add(2);
integers.add(3);
integers.add(4);
Disposable d = Observable
.just(integers)
.flatMap(integers1 -> {
return Observable
.zip(Observable.just(1L).concatWith(Observable.interval(10, 5, TimeUnit.SECONDS)),
Observable.fromIterable(integers1), (aLong, integer1) -> {
return new Pair<Long, Integer>(aLong, integer1);
})
.flatMap(longIntegerPair -> {
System.out.println("DATA " + longIntegerPair.getValue());
return Observable.just(longIntegerPair.getValue());
})
.toList()
.toObservable();
})
.subscribe(integers1 -> {
System.out.println("END");
}, throwable -> {
System.out.println("Error " + throwable.getMessage());
});
以上代码的输出为
DATA 1
wait for 10 seconds
DATA 2
wait for 5
DATA 3
wait for 5
DATA 4
wait for 5 min
我期望的是在每个阶段执行一次 10 秒或 5 秒延迟结束的操作,但确定我应该在电流流中的哪个位置注入该部分。
DATA 1
wait for 10 seconds
[perform operation]
DATA 2
wait for 5
[perform operation]
DATA 3
wait for 5
[perform operation]
DATA 4
wait for 5 min
[perform operation]
使用concatMap
使顺序有序并使用delay
延迟数据的处理以获得您的打印输出模式:
Observable
.zip(Observable.just(-1L).concatWith(Observable.interval(10, 5, TimeUnit.SECONDS)),
Observable.range(1, 5),
(aLong, integer1) -> {
return new Pair<Long, Integer>(aLong, integer1);
}
)
.concatMap(longIntegerPair -> {
System.out.println("DATA " + longIntegerPair.getValue());
return Observable.just(longIntegerPair.getValue())
.delay(longIntegerPair.getKey() < 0 ? 10 : 5, TimeUnit.SECONDS)
.flatMap(value -> {
System.out.println("[performing operation] " + value);
return Observable.just(value);
});
})
.toList()
.toObservable()
.blockingSubscribe();
我正在尝试模拟以特定顺序发射物品时的延迟
这里我模拟一下问题
List<Integer> integers = new ArrayList<>();
integers.add(1);
integers.add(2);
integers.add(3);
integers.add(4);
Disposable d = Observable
.just(integers)
.flatMap(integers1 -> {
return Observable
.zip(Observable.just(1L).concatWith(Observable.interval(10, 5, TimeUnit.SECONDS)),
Observable.fromIterable(integers1), (aLong, integer1) -> {
return new Pair<Long, Integer>(aLong, integer1);
})
.flatMap(longIntegerPair -> {
System.out.println("DATA " + longIntegerPair.getValue());
return Observable.just(longIntegerPair.getValue());
})
.toList()
.toObservable();
})
.subscribe(integers1 -> {
System.out.println("END");
}, throwable -> {
System.out.println("Error " + throwable.getMessage());
});
以上代码的输出为
DATA 1
wait for 10 seconds
DATA 2
wait for 5
DATA 3
wait for 5
DATA 4
wait for 5 min
我期望的是在每个阶段执行一次 10 秒或 5 秒延迟结束的操作,但确定我应该在电流流中的哪个位置注入该部分。
DATA 1
wait for 10 seconds
[perform operation]
DATA 2
wait for 5
[perform operation]
DATA 3
wait for 5
[perform operation]
DATA 4
wait for 5 min
[perform operation]
使用concatMap
使顺序有序并使用delay
延迟数据的处理以获得您的打印输出模式:
Observable
.zip(Observable.just(-1L).concatWith(Observable.interval(10, 5, TimeUnit.SECONDS)),
Observable.range(1, 5),
(aLong, integer1) -> {
return new Pair<Long, Integer>(aLong, integer1);
}
)
.concatMap(longIntegerPair -> {
System.out.println("DATA " + longIntegerPair.getValue());
return Observable.just(longIntegerPair.getValue())
.delay(longIntegerPair.getKey() < 0 ? 10 : 5, TimeUnit.SECONDS)
.flatMap(value -> {
System.out.println("[performing operation] " + value);
return Observable.just(value);
});
})
.toList()
.toObservable()
.blockingSubscribe();