异步获取数据,然后用RxJava串行处理数据
Get data asynchronously and then then process data serially with RxJava
我有一个函数可以创建一个负责获取一些数据的可观察对象。这个函数被多次调用,使得后台有N个线程在取数据。获取数据后,我必须连续处理它们。每轮处理应该在通知到达时开始,并且所有数据都被一个一个地处理(我们不关心顺序,或者在我们处理当前拥有的数据时是否还有 X 个后台线程仍在获取数据)。
目前我已经实施了一个解决方案,但它并不完全正确,因为 currentSubject 只是 returns 第一个值。
示例代码在这里:
package Test;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subjects.PublishSubject;
public class Test {
public static void main(String[] args) throws InterruptedException {
var t = new Test();
var<String> currentSubject = PublishSubject.create();
var n = 4;
for (var i = 0; i < n; ++i) {
Observable.zip(
currentSubject,
t.getData(String.valueOf(i), ThreadLocalRandom.current().nextInt(1, 5)),
(s1, s2) -> s1 + " for " + s2)
.first("Default")
.doOnEvent(
(s, e) -> {
System.out.println("Processing result: " + s);
})
.subscribe();
}
for (var i = 0; i < n; ++i) {
Thread.sleep(i * 2000);
currentSubject.onNext("A notification: "+i);
}
System.out.println("End");
}
public Observable<String> getData(String s, int sec) {
return Observable.just(s)
.delay(sec, TimeUnit.SECONDS)
.doOnNext(
dt -> {
System.out.println("Got data: " + dt);
})
.subscribeOn(Schedulers.newThread());
}
}
我做错了什么,“currentSubject”只是 returns 第一个值?我每次都创建多个 Observables,这会导致问题吗?在内存中有多个 Observables 而它们应该被释放?最后,我怎样才能毫无问题地多次使用“currentSubject”?
我能够通过创建两个 PublishSubjects 来解决问题。一个负责在后台线程中获取数据,另一个在通知到达时调用。使用“zip”/“zipWith”运算符,仅当通知到达时才处理数据,一个接一个。
示例代码:
package Test;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subjects.PublishSubject;
public class Test {
public static void main(String[] args) throws InterruptedException {
var t = new Test();
var<String> fetchDataSubject = PublishSubject.create();
var<String> notificationSubject = PublishSubject.create();
fetchDataSubject
.flatMap(
(e) -> {
return t.getData(String.valueOf(e), ThreadLocalRandom.current().nextInt(1,
4));
})
.zipWith(notificationSubject, (s1, s2) -> s1 + " for " + s2)
.observeOn(Schedulers.single())
.subscribe(
(s) -> {
System.out.println(
"Result: " + s + ", running on thread: " +
Thread.currentThread().getName());
},
(e) -> {
System.out.println("Error: " + e);
});
// Start fetching all data in background jobs
var n = 4;
for (var i = 0; i < n; i++) {
fetchDataSubject.onNext(String.valueOf(i));
}
// Send notifications
for (var i = 0; i < n; ++i) {
notificationSubject.onNext(" \"Notification " + i + "\"");
// Emulate notification arriving after the processing
Thread.sleep(ThreadLocalRandom.current().nextInt(5, 10) * 1000);
}
Thread.sleep(40000);
}
public Observable<String> getData(String s, int sec) {
// Wait some time to fetch data on another thread
return Observable.just(s)
.delay(sec, TimeUnit.SECONDS)
.doOnNext(
dt -> {
System.out.println(
"Got data: "
+ dt
+ " after waiting: "
+ sec
+ "s, on thread "
+ Thread.currentThread().getName());
})
.subscribeOn(Schedulers.newThread());
}
}
我有一个函数可以创建一个负责获取一些数据的可观察对象。这个函数被多次调用,使得后台有N个线程在取数据。获取数据后,我必须连续处理它们。每轮处理应该在通知到达时开始,并且所有数据都被一个一个地处理(我们不关心顺序,或者在我们处理当前拥有的数据时是否还有 X 个后台线程仍在获取数据)。
目前我已经实施了一个解决方案,但它并不完全正确,因为 currentSubject 只是 returns 第一个值。
示例代码在这里:
package Test;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subjects.PublishSubject;
public class Test {
public static void main(String[] args) throws InterruptedException {
var t = new Test();
var<String> currentSubject = PublishSubject.create();
var n = 4;
for (var i = 0; i < n; ++i) {
Observable.zip(
currentSubject,
t.getData(String.valueOf(i), ThreadLocalRandom.current().nextInt(1, 5)),
(s1, s2) -> s1 + " for " + s2)
.first("Default")
.doOnEvent(
(s, e) -> {
System.out.println("Processing result: " + s);
})
.subscribe();
}
for (var i = 0; i < n; ++i) {
Thread.sleep(i * 2000);
currentSubject.onNext("A notification: "+i);
}
System.out.println("End");
}
public Observable<String> getData(String s, int sec) {
return Observable.just(s)
.delay(sec, TimeUnit.SECONDS)
.doOnNext(
dt -> {
System.out.println("Got data: " + dt);
})
.subscribeOn(Schedulers.newThread());
}
}
我做错了什么,“currentSubject”只是 returns 第一个值?我每次都创建多个 Observables,这会导致问题吗?在内存中有多个 Observables 而它们应该被释放?最后,我怎样才能毫无问题地多次使用“currentSubject”?
我能够通过创建两个 PublishSubjects 来解决问题。一个负责在后台线程中获取数据,另一个在通知到达时调用。使用“zip”/“zipWith”运算符,仅当通知到达时才处理数据,一个接一个。
示例代码:
package Test;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subjects.PublishSubject;
public class Test {
public static void main(String[] args) throws InterruptedException {
var t = new Test();
var<String> fetchDataSubject = PublishSubject.create();
var<String> notificationSubject = PublishSubject.create();
fetchDataSubject
.flatMap(
(e) -> {
return t.getData(String.valueOf(e), ThreadLocalRandom.current().nextInt(1,
4));
})
.zipWith(notificationSubject, (s1, s2) -> s1 + " for " + s2)
.observeOn(Schedulers.single())
.subscribe(
(s) -> {
System.out.println(
"Result: " + s + ", running on thread: " +
Thread.currentThread().getName());
},
(e) -> {
System.out.println("Error: " + e);
});
// Start fetching all data in background jobs
var n = 4;
for (var i = 0; i < n; i++) {
fetchDataSubject.onNext(String.valueOf(i));
}
// Send notifications
for (var i = 0; i < n; ++i) {
notificationSubject.onNext(" \"Notification " + i + "\"");
// Emulate notification arriving after the processing
Thread.sleep(ThreadLocalRandom.current().nextInt(5, 10) * 1000);
}
Thread.sleep(40000);
}
public Observable<String> getData(String s, int sec) {
// Wait some time to fetch data on another thread
return Observable.just(s)
.delay(sec, TimeUnit.SECONDS)
.doOnNext(
dt -> {
System.out.println(
"Got data: "
+ dt
+ " after waiting: "
+ sec
+ "s, on thread "
+ Thread.currentThread().getName());
})
.subscribeOn(Schedulers.newThread());
}
}