RX Java 2 为每个 Observable 调用的 onComplete 方法
RX Java 2 onComplete method called for every Observable
我是 RX 新手 Java。我需要在异步模式下执行一些工作,并在所有工作完成后获得回调。我已经将一些 Log.d 放入回调方法中,并且我看到 onComplete(以及 onNext)方法为每个完成的作业执行,但这不是我想要的行为。
此外,如果我调用 dispose 方法,我将无法重新提交新作业,因为线程不会启动,我必须将 null 设置为包含 RX Java 方法的 class 的引用并创建一个新实例。
P.S。请避免使用 lambda 表达式
这是我的代码:
public class Async2 {
private final CompositeDisposable disposables = new CompositeDisposable();
private ArrayList<FileRepresentation> fileRepresentationList = null;
public Async2() {
fileRepresentationList = new ArrayList<>();
}
public ArrayList<FileRepresentation> getFileRepresentationList() {
return fileRepresentationList;
}
public void dispose(){
disposables.dispose();
}
public Observable<FileRepresentation> calcObservable(Uri uri, Context context) {
return Observable.defer(new Callable<ObservableSource<? extends FileRepresentation>>() {
@Override
public ObservableSource<? extends FileRepresentation> call() {
FileUtils fu = new FileUtils();
FileRepresentation fileRepresentation = FileUtils.calcolaChecksumFromUri(uri, context); //this is the long running job
Log.d("test-0X", fileRepresentation.nome);
Log.d("test-0X", fileRepresentation.hash);
Log.d("Thread name: ", Thread.currentThread().getName());
FileRepresentation finalFileRepresentation = fileRepresentation;
//return Observable.defer(() -> Observable.just(finalFileRepresentation));
return Observable.just(finalFileRepresentation);
}
});
}
//
public void addWorks(List<Uri> uriList, Context context, CommunicationInterface com){
fileRepresentationList.clear();
int nObservable = uriList.size();
AtomicInteger remainings = new AtomicInteger(nObservable);
disposables.clear();
com.enableProgressBar();
Disposable[] disposableArr = new Disposable[nObservable];
Log.d("addworks", "addWorks method (nObservable var): "+nObservable);
Log.d("addworks", "addWorks method (disposable.size() ): "+disposables.size());
for (int i= 0; i<nObservable; i++){
Disposable disposable = calcObservable(uriList.get(i), context)
// Run on a background thread
.subscribeOn(Schedulers.single())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<FileRepresentation>() {
@Override
public void onComplete() {
if(remainings.decrementAndGet() == 0){
Log.d("Method onComplete called", "elementi lista: "+fileRepresentationList.size());
Log.d("Method onComplete called", "End!!");
com.disableProgressBar();
com.notifyCompletion();
}
com.updateProgress();
Log.d("Method onComplete called", "End!!");
}
@Override
public void onError(Throwable e) {
if(remainings.decrementAndGet() == 0){
Log.d("Method onError", "elementi lista: "+fileRepresentationList.size());
Log.d("Method onError", "End!!");
com.disableProgressBar();
com.notifyCompletion();
}
com.updateProgress();
Log.d("method onError", "method onError called");
}
@Override
public void onNext(FileRepresentation value) {
fileRepresentationList.add(value);
}
});
disposableArr[i] = disposable;
}
disposables.addAll(disposableArr);
Log.d("addworks", "addWorks method (disposable.size() ): "+disposables.size());
}
}
我开始工作了:
ArrayList<FileRepresentation> li = async2.getFileRepresentationList();
您不必创建 N 个 observable 和观察者,只需从列表中创建一个流:
disposables.add(
Observable.fromIterable(uriList)
.subscribeOn(Schedulers.single())
.flatMap(new Function<Uri, Observable<FileRepresentation>>() {
@Override
public Observable<FileRepresentation> apply(Uri uri) {
return calcObservable(uri, context);
}
}, /*delayErrors */ true)
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<FileRepresentation>() {
@Override
public void onComplete() {
Log.d("Method onComplete called", "elementi lista: "+fileRepresentationList.size());
Log.d("Method onComplete called", "End!!");
com.disableProgressBar();
com.notifyCompletion();
Log.d("Method onComplete called", "End!!");
}
@Override
public void onError(Throwable e) {
Log.d("Method onError", "elementi lista: "+fileRepresentationList.size());
Log.d("Method onError", "End!!");
com.disableProgressBar();
com.notifyCompletion();
Log.d("method onError", "method onError called");
}
@Override
public void onNext(FileRepresentation value) {
fileRepresentationList.add(value);
com.updateProgress();
}
})
);
我是 RX 新手 Java。我需要在异步模式下执行一些工作,并在所有工作完成后获得回调。我已经将一些 Log.d 放入回调方法中,并且我看到 onComplete(以及 onNext)方法为每个完成的作业执行,但这不是我想要的行为。 此外,如果我调用 dispose 方法,我将无法重新提交新作业,因为线程不会启动,我必须将 null 设置为包含 RX Java 方法的 class 的引用并创建一个新实例。
P.S。请避免使用 lambda 表达式
这是我的代码:
public class Async2 {
private final CompositeDisposable disposables = new CompositeDisposable();
private ArrayList<FileRepresentation> fileRepresentationList = null;
public Async2() {
fileRepresentationList = new ArrayList<>();
}
public ArrayList<FileRepresentation> getFileRepresentationList() {
return fileRepresentationList;
}
public void dispose(){
disposables.dispose();
}
public Observable<FileRepresentation> calcObservable(Uri uri, Context context) {
return Observable.defer(new Callable<ObservableSource<? extends FileRepresentation>>() {
@Override
public ObservableSource<? extends FileRepresentation> call() {
FileUtils fu = new FileUtils();
FileRepresentation fileRepresentation = FileUtils.calcolaChecksumFromUri(uri, context); //this is the long running job
Log.d("test-0X", fileRepresentation.nome);
Log.d("test-0X", fileRepresentation.hash);
Log.d("Thread name: ", Thread.currentThread().getName());
FileRepresentation finalFileRepresentation = fileRepresentation;
//return Observable.defer(() -> Observable.just(finalFileRepresentation));
return Observable.just(finalFileRepresentation);
}
});
}
//
public void addWorks(List<Uri> uriList, Context context, CommunicationInterface com){
fileRepresentationList.clear();
int nObservable = uriList.size();
AtomicInteger remainings = new AtomicInteger(nObservable);
disposables.clear();
com.enableProgressBar();
Disposable[] disposableArr = new Disposable[nObservable];
Log.d("addworks", "addWorks method (nObservable var): "+nObservable);
Log.d("addworks", "addWorks method (disposable.size() ): "+disposables.size());
for (int i= 0; i<nObservable; i++){
Disposable disposable = calcObservable(uriList.get(i), context)
// Run on a background thread
.subscribeOn(Schedulers.single())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<FileRepresentation>() {
@Override
public void onComplete() {
if(remainings.decrementAndGet() == 0){
Log.d("Method onComplete called", "elementi lista: "+fileRepresentationList.size());
Log.d("Method onComplete called", "End!!");
com.disableProgressBar();
com.notifyCompletion();
}
com.updateProgress();
Log.d("Method onComplete called", "End!!");
}
@Override
public void onError(Throwable e) {
if(remainings.decrementAndGet() == 0){
Log.d("Method onError", "elementi lista: "+fileRepresentationList.size());
Log.d("Method onError", "End!!");
com.disableProgressBar();
com.notifyCompletion();
}
com.updateProgress();
Log.d("method onError", "method onError called");
}
@Override
public void onNext(FileRepresentation value) {
fileRepresentationList.add(value);
}
});
disposableArr[i] = disposable;
}
disposables.addAll(disposableArr);
Log.d("addworks", "addWorks method (disposable.size() ): "+disposables.size());
}
}
我开始工作了:
ArrayList<FileRepresentation> li = async2.getFileRepresentationList();
您不必创建 N 个 observable 和观察者,只需从列表中创建一个流:
disposables.add(
Observable.fromIterable(uriList)
.subscribeOn(Schedulers.single())
.flatMap(new Function<Uri, Observable<FileRepresentation>>() {
@Override
public Observable<FileRepresentation> apply(Uri uri) {
return calcObservable(uri, context);
}
}, /*delayErrors */ true)
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<FileRepresentation>() {
@Override
public void onComplete() {
Log.d("Method onComplete called", "elementi lista: "+fileRepresentationList.size());
Log.d("Method onComplete called", "End!!");
com.disableProgressBar();
com.notifyCompletion();
Log.d("Method onComplete called", "End!!");
}
@Override
public void onError(Throwable e) {
Log.d("Method onError", "elementi lista: "+fileRepresentationList.size());
Log.d("Method onError", "End!!");
com.disableProgressBar();
com.notifyCompletion();
Log.d("method onError", "method onError called");
}
@Override
public void onNext(FileRepresentation value) {
fileRepresentationList.add(value);
com.updateProgress();
}
})
);