Observable.just(doSomeLongStuff()) 运行 doSomeLongStuff() 在我订阅 observable 之前
Observable.just(doSomeLongStuff()) run doSomeLongStuff() before I subscribe to observable
我对 RxJava2 有一些愚蠢的问题。
我需要同时 运行 两个长操作。我知道我应该使用 Observable.zip() 并且我使用了它。
我的长操作的问题是一个接一个 运行 另一个问题是我的长操作在我订阅它们之前就开始了。
让我们想象一下,这是我应该 运行 异步的长时间操作。
private String doSomethingLong() {
Random rand = new Random();
int value = rand.nextInt(5);
Timber.i("Do something for [%d] sec [%s]", value, Thread.currentThread().getName());
try {
Thread.sleep(value * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
return String.format(Locale.getDefault(), "Exception [%s]", e.getMessage());
}
return String.format(Locale.getDefault(),"Job for [%d] seconds", value);
}
并且让有一个像 test() 这样的方法来尝试使其并行:
public void test() {
final long started = System.currentTimeMillis();
Observable<String> just1 = Observable.just(doSomethingLong()).subscribeOn(Schedulers.newThread());
Observable<String> just2 = Observable.just(doSomethingLong()).subscribeOn(Schedulers.newThread());
Observable.zip(just1, just2, new Func2<String, String, Combined>() {
@Override
public Combined call(String s, String s2) {
return new Combined(s, s2);
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Combined>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Combined combined) {
long total = System.currentTimeMillis() - started;
Timber.i("TOTAL [%d]ms [%s]", total, combined.toString());
}
});
}
当我尝试 运行 时,我观察到两个可观察值 just1 和 just2 运行 一个接一个……这让我很困惑……
但是还有另一个工作人员让我更加困惑...我评论 Observable.zip 并注意到 just1 和 just2 在我订阅它们之前启动了方法 doSomethingLong()...
让我展示一下:
public void test() {
final long started = System.currentTimeMillis();
Observable<String> just1 = Observable.just(doSomethingLong()).subscribeOn(Schedulers.newThread());
Observable<String> just2 = Observable.just(doSomethingLong()).subscribeOn(Schedulers.newThread());
// Observable.zip(just1, just2, new Func2<String, String, Combined>() {
// @Override
// public Combined call(String s, String s2) {
// return new Combined(s, s2);
// }
// }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Combined>() {
// @Override
// public void onCompleted() {
//
// }
//
// @Override
// public void onError(Throwable e) {
//
// }
//
// @Override
// public void onNext(Combined combined) {
// long total = System.currentTimeMillis() - started;
// Timber.i("TOTAL [%d]ms [%s]", total, combined.toString());
// }
// });
}
这段代码几乎是一样的 - 它是 运行 两次 doSomethingLong() 一个接一个...
我的期望:
1. 我需要 doSomethingLong() 方法 运行 并行
2. 在我开始订阅之前,我要求解释为什么这些方法 运行s。
3.这种情况我应该怎么写好我的代码。我希望在订阅它们之前不要调用 doSomethingLong() 方法。
非常感谢。希望我把问题解释清楚。
将您的 Observable 创建为 Observable.fromCallable{}。
而不是 zip 使用 combineLatest()
文档:
http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#fromCallable-java.util.concurrent.Callable-
http://reactivex.io/documentation/operators/combinelatest.html
当您订阅时,Observable.just
不会 运行 任何东西。它会在您订阅时发出元素,但您的 doSomethingLong
将在您将其作为参数传递后立即 运行 。这很正常,这就是语言的工作方式。
您正在寻找的是一种在我们订阅时说 return 的方式,但当时也只 运行 并且希望在后台线程上。
对此有几个答案,这里是一些:
使用延迟
有一个名为 defer
的运算符,它接受一个将在您订阅后执行的 lambda:
Observable.defer(() -> doSomethingLong())
这只会在您订阅时执行doSomethingLong
使用 fromCallable
您可以从 lambda 创建一个可观察对象。这被称为 fromCallable
:
Observable.fromCallable(() -> doSomethingLong())
同样,这只会在您订阅运行 doSomethingLong
时
使用创建
我认为这可能是最令人气馁的做法,因为有几件事你必须处理,但我认为为了完整性,可以提一下:
Observable.create( emitter -> {
if(emitter.isDisposed()) return;
emitter.onNext(doSomethingLong());
emitter.onComplete();
});
同样,我确信有更多方法可以做到这一点。我只是想解释一下这个问题并提供一些选择。
我对 RxJava2 有一些愚蠢的问题。
我需要同时 运行 两个长操作。我知道我应该使用 Observable.zip() 并且我使用了它。
我的长操作的问题是一个接一个 运行 另一个问题是我的长操作在我订阅它们之前就开始了。
让我们想象一下,这是我应该 运行 异步的长时间操作。
private String doSomethingLong() {
Random rand = new Random();
int value = rand.nextInt(5);
Timber.i("Do something for [%d] sec [%s]", value, Thread.currentThread().getName());
try {
Thread.sleep(value * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
return String.format(Locale.getDefault(), "Exception [%s]", e.getMessage());
}
return String.format(Locale.getDefault(),"Job for [%d] seconds", value);
}
并且让有一个像 test() 这样的方法来尝试使其并行:
public void test() {
final long started = System.currentTimeMillis();
Observable<String> just1 = Observable.just(doSomethingLong()).subscribeOn(Schedulers.newThread());
Observable<String> just2 = Observable.just(doSomethingLong()).subscribeOn(Schedulers.newThread());
Observable.zip(just1, just2, new Func2<String, String, Combined>() {
@Override
public Combined call(String s, String s2) {
return new Combined(s, s2);
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Combined>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Combined combined) {
long total = System.currentTimeMillis() - started;
Timber.i("TOTAL [%d]ms [%s]", total, combined.toString());
}
});
}
当我尝试 运行 时,我观察到两个可观察值 just1 和 just2 运行 一个接一个……这让我很困惑……
但是还有另一个工作人员让我更加困惑...我评论 Observable.zip 并注意到 just1 和 just2 在我订阅它们之前启动了方法 doSomethingLong()...
让我展示一下:
public void test() {
final long started = System.currentTimeMillis();
Observable<String> just1 = Observable.just(doSomethingLong()).subscribeOn(Schedulers.newThread());
Observable<String> just2 = Observable.just(doSomethingLong()).subscribeOn(Schedulers.newThread());
// Observable.zip(just1, just2, new Func2<String, String, Combined>() {
// @Override
// public Combined call(String s, String s2) {
// return new Combined(s, s2);
// }
// }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Combined>() {
// @Override
// public void onCompleted() {
//
// }
//
// @Override
// public void onError(Throwable e) {
//
// }
//
// @Override
// public void onNext(Combined combined) {
// long total = System.currentTimeMillis() - started;
// Timber.i("TOTAL [%d]ms [%s]", total, combined.toString());
// }
// });
}
这段代码几乎是一样的 - 它是 运行 两次 doSomethingLong() 一个接一个...
我的期望: 1. 我需要 doSomethingLong() 方法 运行 并行 2. 在我开始订阅之前,我要求解释为什么这些方法 运行s。 3.这种情况我应该怎么写好我的代码。我希望在订阅它们之前不要调用 doSomethingLong() 方法。
非常感谢。希望我把问题解释清楚。
将您的 Observable 创建为 Observable.fromCallable{}。 而不是 zip 使用 combineLatest()
文档: http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#fromCallable-java.util.concurrent.Callable- http://reactivex.io/documentation/operators/combinelatest.html
Observable.just
不会 运行 任何东西。它会在您订阅时发出元素,但您的 doSomethingLong
将在您将其作为参数传递后立即 运行 。这很正常,这就是语言的工作方式。
您正在寻找的是一种在我们订阅时说 return 的方式,但当时也只 运行 并且希望在后台线程上。
对此有几个答案,这里是一些:
使用延迟
有一个名为 defer
的运算符,它接受一个将在您订阅后执行的 lambda:
Observable.defer(() -> doSomethingLong())
这只会在您订阅时执行doSomethingLong
使用 fromCallable
您可以从 lambda 创建一个可观察对象。这被称为 fromCallable
:
Observable.fromCallable(() -> doSomethingLong())
同样,这只会在您订阅运行 doSomethingLong
时
使用创建
我认为这可能是最令人气馁的做法,因为有几件事你必须处理,但我认为为了完整性,可以提一下:
Observable.create( emitter -> {
if(emitter.isDisposed()) return;
emitter.onNext(doSomethingLong());
emitter.onComplete();
});
同样,我确信有更多方法可以做到这一点。我只是想解释一下这个问题并提供一些选择。