异步获取数据,然后用RxJava串行处理数据

Get data asynchronously and then then process data serially with RxJava

我有一个函数可以创建一个负责获取一些数据的可观察对象。这个函数被多次调用,使得后台有N个线程在取数据。获取数据后,我必须连续处理它们。每轮处理应该在通知到达时开始,并且所有数据都被一个一个地处理(我们不关心顺序,或者在我们处理当前拥有的数据时是否还有 X 个后台线程仍在获取数据)。

目前我已经实施了一个解决方案,但它并不完全正确,因为 currentSubject 只是 returns 第一个值。

示例代码在这里:

package Test;

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subjects.PublishSubject;

public class Test {
  public static void main(String[] args) throws InterruptedException {
    var t = new Test();

    var<String> currentSubject = PublishSubject.create();
    var n = 4;
    for (var i = 0; i < n; ++i) {
      Observable.zip(
              currentSubject,
              t.getData(String.valueOf(i), ThreadLocalRandom.current().nextInt(1, 5)),
              (s1, s2) -> s1 + " for " + s2)
          .first("Default")
          .doOnEvent(
              (s, e) -> {
                System.out.println("Processing result: " + s);
              })
          .subscribe();
    }

    for (var i = 0; i < n; ++i) {
      Thread.sleep(i * 2000);
      currentSubject.onNext("A notification: "+i);
    }
    System.out.println("End");
  }

  public Observable<String> getData(String s, int sec) {
    return Observable.just(s)
        .delay(sec, TimeUnit.SECONDS)
        .doOnNext(
            dt -> {
              System.out.println("Got data: " + dt);
            })
        .subscribeOn(Schedulers.newThread());
  }
}

我做错了什么,“currentSubject”只是 returns 第一个值?我每次都创建多个 Observables,这会导致问题吗?在内存中有多个 Observables 而它们应该被释放?最后,我怎样才能毫无问题地多次使用“currentSubject”?

我能够通过创建两个 PublishSubjects 来解决问题。一个负责在后台线程中获取数据,另一个在通知到达时调用。使用“zip”/“zipWith”运算符,仅当通知到达时才处理数据,一个接一个。

示例代码:

package Test;

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subjects.PublishSubject;

public class Test {
  public static void main(String[] args) throws InterruptedException {
  var t = new Test();
  var<String> fetchDataSubject = PublishSubject.create();
  var<String> notificationSubject = PublishSubject.create();

  fetchDataSubject
      .flatMap(
          (e) -> {
            return t.getData(String.valueOf(e), ThreadLocalRandom.current().nextInt(1, 
                   4));
        })
      .zipWith(notificationSubject, (s1, s2) -> s1 + " for " + s2)
      .observeOn(Schedulers.single())
      .subscribe(
          (s) -> {
            System.out.println(
                "Result: " + s + ", running on thread: " + 
                  Thread.currentThread().getName());
          },
          (e) -> {
            System.out.println("Error: " + e);
          });

  // Start fetching all data in background jobs
  var n = 4;
  for (var i = 0; i < n; i++) {
    fetchDataSubject.onNext(String.valueOf(i));
  }
  
  // Send notifications
  for (var i = 0; i < n; ++i) {
    notificationSubject.onNext(" \"Notification " + i + "\"");

    // Emulate notification arriving after the processing
    Thread.sleep(ThreadLocalRandom.current().nextInt(5, 10) * 1000);
  }

  Thread.sleep(40000);
}

public Observable<String> getData(String s, int sec) {

  // Wait some time to fetch data on another thread
  return Observable.just(s)
      .delay(sec, TimeUnit.SECONDS)
      .doOnNext(
          dt -> {
            System.out.println(
                "Got data: "
                    + dt
                    + " after waiting: "
                    + sec
                    + "s, on  thread "
                    + Thread.currentThread().getName());
          })
      .subscribeOn(Schedulers.newThread());
  }
}