如何使用 RxJava2 从无限流 <Integer> 创建 Observable<Integer>?
How to create Observable<Integer> from infinite Stream<Integer> using RxJava2?
我正在尝试将 RxJava 从 1 升级到 2。在我的旧代码中,我有一个如下所示的方法:
private Observable<Integer> reversRange(int from, int to) {
Stream<Integer> intStream = Stream.iterate(to, p -> p - 1);
return Observable.from(() -> intStream.iterator())
.takeWhile(n -> n > from)
.map(n -> n );
}
但是现在在 RxJava 2 中我不能使用 from
。什么相当于这段代码?
我在 中发现它是 fromIterable
但我不知道如何将它与 Stream 一起使用。
或其他示例,这不应该仅适用于范围,而是适用于任何无限流。
private Observable<Integer> numbers() {
Stream<Integer> intStream = Stream.iterate(0, p -> p + 1);
return Observable.from(() -> intStream.iterator());
}
如果你只有 Stream
的 Integer
那么你可以简单地做:
Observable.fromIterable(IntStream.rangeClosed(from,to)
.boxed()
.collect(Collectors.toList()));
rangedClosed
的参数包括在内。
您可以使用另一种通用方法,它更接近您尝试使用的方法:
Observable.fromIterable(Stream.iterate(from, integer -> integer + 1)
.filter(integer -> integer < (to+1))
.limit((long(to+1)-long(from))
.collect(Collectors.toList()));
EDIT1
如果你想要无限流。 Java Stream 的 generate
和 iterate
都产生无限流。在我使用 iterate
的示例中(您可以使用 generate
将其替换为具有自定义对象创建代码的 Supplier
)并删除所有终端操作符,例如 limit
.
然后将它们包装到 Observable
或 Flowable
中,如果你想要 RxJava2 的背压支持,然后再包装到 Observable
中。
像这样:
Observable.just(Stream.generate(() -> // Object creation logic here));
或
Observable.just(Flowable.just(Stream.generate(() -> // Object creation logic here)));
请记住,如果您这样做,您的代码将无限期地创建对象并且您的程序将 运行 直到您的记忆 运行 耗尽。
我猜你有某种服务正在向你发送数据,你需要进行一些转换并将该数据作为流发送到其他地方。我建议将数据作为 Future
获取,然后将其包装到 Flowable
中,然后将数据流式传输到您要发送的任何地方。
喜欢:
Flowable.fromFuture(senderService.getDataAsCompletableFuture);
然后指定背压策略。
EDIT2
您可以使用 Observable.generate()
来完成。
喜欢:
Observable.generate(() -> from, (value, emitter) -> {
emitter.onNext(value);
return value + 1;
});
使用generate()
函数:
这是 kotlin 代码(扩展函数),但您只需稍微更改 lambda 即可。这适用于任何流。
fun <T> Stream<T>.toFlowable(): Flowable<T> {
return Flowable.generate(
Callable { iterator() },
BiConsumer { ite, emitter ->
if (ite.hasNext()) {
emitter.onNext(ite.next())
} else {
emitter.onComplete()
}
}
)
}
如果您愿意,也可以使用 Observable
,但我不明白您为什么要这样做。
fun <T> Stream<T>.toObservable(): Observable<T> {
return Observable.generate(
Callable { iterator() },
BiConsumer { ite, emitter ->
if (ite.hasNext()) {
emitter.onNext(ite.next())
} else {
emitter.onComplete()
}
}
)
}
我认为 java 会是这样的:
public <T> Observable<T> streamToObservable(Stream<T> stream) {
return Observable.generate(
() -> stream.iterator(),
(ite, emitter) -> {
if (ite.hasNext()) {
emitter.onNext(ite.next());
} else {
emitter.onComplete();
}
}
);
}
因此您的代码将变为:
private Observable<Integer> numbers() {
Stream<Integer> intStream = Stream.iterate(0, p -> p + 1);
return streamToObservable(intStream);
}
How to create Observable from infinite Stream using RxJava2?
像这样:
public static <T> Observable<T> streamToObservable(Stream<T> stream) {
return Observable.fromIterable(stream::iterator);
}
用法:
public static void main(String[] args) {
//Some infinite stream
Stream<Integer> stream = Stream.iterate(1, (i)->i+1);
//create an observable from the stream
Observable<Integer> obs = streamToObservable(stream);
//subscribe to the observable
obs.subscribe((v) -> {
System.out.println("Value from observable:"+v);
});
}
输出:
Value from observable 1
Value from observable 2
Value from observable 3
Value from observable 4
.........
This answer 解释了如何将 Stream
转换为 Iterable
。
RxJava3
//Some infinite stream
Stream<Integer> stream = Stream.iterate(1, (i)->i+1);
Observable<Integer> obs = Observable.fromStream(stream);
我正在尝试将 RxJava 从 1 升级到 2。在我的旧代码中,我有一个如下所示的方法:
private Observable<Integer> reversRange(int from, int to) {
Stream<Integer> intStream = Stream.iterate(to, p -> p - 1);
return Observable.from(() -> intStream.iterator())
.takeWhile(n -> n > from)
.map(n -> n );
}
但是现在在 RxJava 2 中我不能使用 from
。什么相当于这段代码?
我在 fromIterable
但我不知道如何将它与 Stream 一起使用。
或其他示例,这不应该仅适用于范围,而是适用于任何无限流。
private Observable<Integer> numbers() {
Stream<Integer> intStream = Stream.iterate(0, p -> p + 1);
return Observable.from(() -> intStream.iterator());
}
如果你只有 Stream
的 Integer
那么你可以简单地做:
Observable.fromIterable(IntStream.rangeClosed(from,to)
.boxed()
.collect(Collectors.toList()));
rangedClosed
的参数包括在内。
您可以使用另一种通用方法,它更接近您尝试使用的方法:
Observable.fromIterable(Stream.iterate(from, integer -> integer + 1)
.filter(integer -> integer < (to+1))
.limit((long(to+1)-long(from))
.collect(Collectors.toList()));
EDIT1
如果你想要无限流。 Java Stream 的 generate
和 iterate
都产生无限流。在我使用 iterate
的示例中(您可以使用 generate
将其替换为具有自定义对象创建代码的 Supplier
)并删除所有终端操作符,例如 limit
.
然后将它们包装到 Observable
或 Flowable
中,如果你想要 RxJava2 的背压支持,然后再包装到 Observable
中。
像这样:
Observable.just(Stream.generate(() -> // Object creation logic here));
或
Observable.just(Flowable.just(Stream.generate(() -> // Object creation logic here)));
请记住,如果您这样做,您的代码将无限期地创建对象并且您的程序将 运行 直到您的记忆 运行 耗尽。
我猜你有某种服务正在向你发送数据,你需要进行一些转换并将该数据作为流发送到其他地方。我建议将数据作为 Future
获取,然后将其包装到 Flowable
中,然后将数据流式传输到您要发送的任何地方。
喜欢:
Flowable.fromFuture(senderService.getDataAsCompletableFuture);
然后指定背压策略。
EDIT2
您可以使用 Observable.generate()
来完成。
喜欢:
Observable.generate(() -> from, (value, emitter) -> {
emitter.onNext(value);
return value + 1;
});
使用generate()
函数:
这是 kotlin 代码(扩展函数),但您只需稍微更改 lambda 即可。这适用于任何流。
fun <T> Stream<T>.toFlowable(): Flowable<T> {
return Flowable.generate(
Callable { iterator() },
BiConsumer { ite, emitter ->
if (ite.hasNext()) {
emitter.onNext(ite.next())
} else {
emitter.onComplete()
}
}
)
}
如果您愿意,也可以使用 Observable
,但我不明白您为什么要这样做。
fun <T> Stream<T>.toObservable(): Observable<T> {
return Observable.generate(
Callable { iterator() },
BiConsumer { ite, emitter ->
if (ite.hasNext()) {
emitter.onNext(ite.next())
} else {
emitter.onComplete()
}
}
)
}
我认为 java 会是这样的:
public <T> Observable<T> streamToObservable(Stream<T> stream) {
return Observable.generate(
() -> stream.iterator(),
(ite, emitter) -> {
if (ite.hasNext()) {
emitter.onNext(ite.next());
} else {
emitter.onComplete();
}
}
);
}
因此您的代码将变为:
private Observable<Integer> numbers() {
Stream<Integer> intStream = Stream.iterate(0, p -> p + 1);
return streamToObservable(intStream);
}
How to create Observable from infinite Stream using RxJava2?
像这样:
public static <T> Observable<T> streamToObservable(Stream<T> stream) {
return Observable.fromIterable(stream::iterator);
}
用法:
public static void main(String[] args) {
//Some infinite stream
Stream<Integer> stream = Stream.iterate(1, (i)->i+1);
//create an observable from the stream
Observable<Integer> obs = streamToObservable(stream);
//subscribe to the observable
obs.subscribe((v) -> {
System.out.println("Value from observable:"+v);
});
}
输出:
Value from observable 1
Value from observable 2
Value from observable 3
Value from observable 4
.........
This answer 解释了如何将 Stream
转换为 Iterable
。
RxJava3
//Some infinite stream
Stream<Integer> stream = Stream.iterate(1, (i)->i+1);
Observable<Integer> obs = Observable.fromStream(stream);