如何使用 RxJava2 从无限流 <Integer> 创建 Observable<Integer>?

How to create Observable<Integer> from infinite Stream<Integer> using RxJava2?

我正在尝试将 RxJava 从 1 升级到 2。在我的旧代码中,我有一个如下所示的方法:

private Observable<Integer> reversRange(int from, int to) {
    Stream<Integer> intStream = Stream.iterate(to, p -> p - 1);
    return Observable.from(() -> intStream.iterator())
            .takeWhile(n -> n > from)
            .map(n -> n );
}

但是现在在 RxJava 2 中我不能使用 from。什么相当于这段代码? 我在 中发现它是 fromIterable 但我不知道如何将它与 Stream 一起使用。

或其他示例,这不应该仅适用于范围,而是适用于任何无限流。

private Observable<Integer> numbers() {
    Stream<Integer> intStream = Stream.iterate(0, p -> p + 1);
    return Observable.from(() -> intStream.iterator());
}

如果你只有 StreamInteger 那么你可以简单地做:

Observable.fromIterable(IntStream.rangeClosed(from,to)
            .boxed()
            .collect(Collectors.toList()));  

rangedClosed 的参数包括在内。

您可以使用另一种通用方法,它更接近您尝试使用的方法:

Observable.fromIterable(Stream.iterate(from, integer -> integer + 1)
                .filter(integer -> integer < (to+1))
                .limit((long(to+1)-long(from))
                .collect(Collectors.toList()));  

EDIT1

如果你想要无限流。 Java Stream 的 generateiterate 都产生无限流。在我使用 iterate 的示例中(您可以使用 generate 将其替换为具有自定义对象创建代码的 Supplier )并删除所有终端操作符,例如 limit .
然后将它们包装到 ObservableFlowable 中,如果你想要 RxJava2 的背压支持,然后再包装到 Observable 中。

像这样:

Observable.just(Stream.generate(() -> // Object creation logic here));  

Observable.just(Flowable.just(Stream.generate(() -> // Object creation logic here)));  

请记住,如果您这样做,您的代码将无限期地创建对象并且您的程序将 运行 直到您的记忆 运行 耗尽。
我猜你有某种服务正在向你发送数据,你需要进行一些转换并将该数据作为流发送到其他地方。我建议将数据作为 Future 获取,然后将其包装到 Flowable 中,然后将数据流式传输到您要发送的任何地方。
喜欢:

Flowable.fromFuture(senderService.getDataAsCompletableFuture);  

然后指定背压策略。

EDIT2
您可以使用 Observable.generate() 来完成。
喜欢:

Observable.generate(() -> from, (value, emitter) -> {
        emitter.onNext(value);
        return value + 1; 
    });

使用generate()函数:

这是 kotlin 代码(扩展函数),但您只需稍微更改 lambda 即可。这适用于任何流。

fun <T> Stream<T>.toFlowable(): Flowable<T> {
  return Flowable.generate(
    Callable { iterator() },
    BiConsumer { ite, emitter ->
      if (ite.hasNext()) {
        emitter.onNext(ite.next())
      } else {
        emitter.onComplete()
      }
    }
  )
}

如果您愿意,也可以使用 Observable,但我不明白您为什么要这样做。

fun <T> Stream<T>.toObservable(): Observable<T> {
  return Observable.generate(
    Callable { iterator() },
    BiConsumer { ite, emitter ->
      if (ite.hasNext()) {
        emitter.onNext(ite.next())
      } else {
        emitter.onComplete()
      }
    }
  )
}

我认为 java 会是这样的:

public <T> Observable<T> streamToObservable(Stream<T> stream) {
  return Observable.generate(
    () -> stream.iterator(),
    (ite, emitter) -> {
      if (ite.hasNext()) {
        emitter.onNext(ite.next());
      } else {
        emitter.onComplete();
      }
    }
  );
}

因此您的代码将变为:

private Observable<Integer> numbers() {
    Stream<Integer> intStream = Stream.iterate(0, p -> p + 1);
    return streamToObservable(intStream);
}

How to create Observable from infinite Stream using RxJava2?

像这样:

    public static <T> Observable<T> streamToObservable(Stream<T> stream) {
        return Observable.fromIterable(stream::iterator);
    }

用法:



    public static void main(String[] args) {
        //Some infinite stream
        Stream<Integer> stream = Stream.iterate(1, (i)->i+1);

        //create an observable from the stream
        Observable<Integer> obs = streamToObservable(stream);
        
        //subscribe to the observable
        obs.subscribe((v) -> {
            System.out.println("Value from observable:"+v);
        });
        
    }

输出:

Value from observable 1
Value from observable 2
Value from observable 3
Value from observable 4
.........

This answer 解释了如何将 Stream 转换为 Iterable


RxJava3

//Some infinite stream
Stream<Integer> stream = Stream.iterate(1, (i)->i+1);

Observable<Integer> obs = Observable.fromStream(stream);