如何在 Android RxJava Observable 中按顺序 运行 2 个查询?
How to run 2 queries sequentially in a Android RxJava Observable?
我想要 运行 2 个异步任务,一个接着另一个(按顺序)。我看过一些关于 ZIP 或 Flat 的东西,但我不是很了解...
我的目的是从本地 SQLite 加载数据,完成后,它将查询调用到服务器(远程)。
有人可以建议我实现该目标的方法吗?
这是我正在使用的 RxJava Observable 骨架(单个任务):
// RxJava Observable
Observable.OnSubscribe<Object> onSubscribe = subscriber -> {
try {
// Do the query or long task...
subscriber.onNext(object);
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
};
// RxJava Observer
Subscriber<Object> subscriber = new Subscriber<Object>() {
@Override
public void onCompleted() {
// Handle the completion
}
@Override
public void onError(Throwable e) {
// Handle the error
}
@Override
public void onNext(Object result) {
// Handle the result
}
};
Observable.create(onSubscribe)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
执行此操作的运算符是 merge
,请参阅 http://reactivex.io/documentation/operators/merge.html。
我的方法是创建两个可观察对象,比方说 observableLocal
和 observableRemote
,然后合并输出:
Observable<Object> observableLocal = Observable.create(...)
Observable<Object> observableRemote = Observable.create(...)
Observable.merge(observableLocal, observableRemote)
.subscribe(subscriber)
如果你想确保远程在本地之后运行,你可以使用concat
。
如果查询不相互依赖,Lukas Batteau 的回答是最好的。但是,如果您需要从本地 SQLite 查询 获取数据,然后 您 运行 远程查询(例如,您需要远程查询参数的数据或 headers) 那么你可以从本地 observable 开始,然后将其平面映射以组合两个 observables 在 你从本地查询中获取数据后:
Observable<Object> localObservable = Observable.create(...)
localObservable.flatMap(object ->
{
return Observable.zip(Observable.just(object), *create remote observable here*,
(localObservable, remoteObservable) ->
{
*combining function*
});
}).subscribe(subscriber);
flatmap 函数允许您通过 zip 函数将本地 observable 转换为本地和远程 observable 的组合。重申一下,这里的优点是两个可观察量是顺序的,并且 zip 函数只会在两个依赖可观察量 运行.
之后 运行
此外,即使底层 objects 具有不同的类型,zip 函数也允许您组合可观察对象。在这种情况下,您提供一个组合函数作为第三个参数。如果底层数据是同一类型,则将zip函数替换为merge。
您可以尝试我的解决方案,有多种方法可以解决您的问题。
为了确保它正常工作,我创建了一个独立的工作示例并使用此 API 进行测试:https://jsonplaceholder.typicode.com/posts/1
private final Retrofit retrofit = new Retrofit.Builder()
.baseUrl("https://jsonplaceholder.typicode.com/posts/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build();
private final RestPostsService restPostsService = retrofit.create(RestPostsService.class);
private Observable<Posts> getPostById(int id) {
return restPostsService.getPostsById(id);
}
RestPostService.java
package app.com.rxretrofit;
import retrofit2.http.GET;
import retrofit2.http.Path;
import rx.Observable;
/**
* -> Created by Think-Twice-Code-Once on 11/26/2017.
*/
public interface RestPostsService {
@GET("{id}")
Observable<Posts> getPostsById(@Path("id") int id);
}
解决方案1: 顺序调用多个任务时使用,前面任务的结果总是输入下一个任务
getPostById(1)
.concatMap(posts1 -> {
//get post 1 success
return getPostById(posts1.getId() + 1);
})
.concatMap(posts2 -> {
//get post 2 success
return getPostById(posts2.getId() + 1);
})
.concatMap(posts3 -> {
//get post 3success
return getPostById(posts3.getId() + 1);
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(finalPosts -> {
//get post 4 success
Toast.makeText(this, "Final result: " + finalPosts.getId() + " - " + finalPosts.getTitle(),
Toast.LENGTH_LONG).show();
});
解决方案2: 顺序调用多个任务时使用,前一个任务的所有结果作为输入最终任务(例如:上传头像和封面图片后,调用 api 使用这些图片 URL 创建新用户):
Observable
.zip(getPostById(1), getPostById(2), getPostById(3), (posts1, posts2, posts3) -> {
//this method defines how to zip all separate results into one
return posts1.getId() + posts2.getId() + posts3.getId();
})
.flatMap(finalPostId -> {
//after get all first three posts, get the final posts,
// the final posts-id is sum of these posts-id
return getPostById(finalPostId);
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(finalPosts -> {
Toast.makeText(this, "Final posts: " + finalPosts.getId() + " - " + finalPosts.getTitle(),
Toast.LENGTH_SHORT).show();
});
AndroidManifest
<uses-permission android:name="android.permission.INTERNET"/>
root build.gradle
// Top-level build file where you can add configuration options common to all sub-projects/modules.
buildscript {
repositories {
jcenter()
}
dependencies {
classpath 'com.android.tools.build:gradle:2.3.3'
classpath 'me.tatarka:gradle-retrolambda:3.2.0'
classpath 'me.tatarka.retrolambda.projectlombok:lombok.ast:0.2.3.a2'
// NOTE: Do not place your application dependencies here; they belong
// in the individual module build.gradle files
}
// Exclude the version that the android plugin depends on.
configurations.classpath.exclude group: 'com.android.tools.external.lombok'
}
allprojects {
repositories {
jcenter()
}
}
task clean(type: Delete) {
delete rootProject.buildDir
}
app/build.gradle
apply plugin: 'me.tatarka.retrolambda'
apply plugin: 'com.android.application'
android {
compileSdkVersion 26
buildToolsVersion "26.0.1"
defaultConfig {
applicationId "app.com.rxretrofit"
minSdkVersion 15
targetSdkVersion 26
versionCode 1
versionName "1.0"
testInstrumentationRunner "android.support.test.runner.AndroidJUnitRunner"
}
buildTypes {
release {
minifyEnabled false
proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'
}
}
compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8
}
}
dependencies {
compile fileTree(dir: 'libs', include: ['*.jar'])
androidTestCompile('com.android.support.test.espresso:espresso-core:2.2.2', {
exclude group: 'com.android.support', module: 'support-annotations'
})
compile 'com.android.support:appcompat-v7:26.+'
compile 'com.android.support.constraint:constraint-layout:1.0.2'
testCompile 'junit:junit:4.12'
provided 'org.projectlombok:lombok:1.16.6'
compile 'com.squareup.retrofit2:retrofit:2.3.0'
compile 'com.squareup.retrofit2:converter-gson:2.3.0'
compile 'com.squareup.retrofit2:adapter-rxjava:2.3.0'
compile 'io.reactivex:rxandroid:1.2.1'
}
型号
package app.com.rxretrofit;
import com.google.gson.annotations.SerializedName;
/**
* -> Created by Think-Twice-Code-Once on 11/26/2017.
*/
public class Posts {
@SerializedName("userId")
private int userId;
@SerializedName("id")
private int id;
@SerializedName("title")
private String title;
@SerializedName("body")
private String body;
public int getUserId() {
return userId;
}
public void setUserId(int userId) {
this.userId = userId;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getBody() {
return body;
}
public void setBody(String body) {
this.body = body;
}
}
顺便说一下,使用 Rx + Retrofit + Dagger + MVP 模式 是一个很好的组合。
我想要 运行 2 个异步任务,一个接着另一个(按顺序)。我看过一些关于 ZIP 或 Flat 的东西,但我不是很了解...
我的目的是从本地 SQLite 加载数据,完成后,它将查询调用到服务器(远程)。
有人可以建议我实现该目标的方法吗?
这是我正在使用的 RxJava Observable 骨架(单个任务):
// RxJava Observable
Observable.OnSubscribe<Object> onSubscribe = subscriber -> {
try {
// Do the query or long task...
subscriber.onNext(object);
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
};
// RxJava Observer
Subscriber<Object> subscriber = new Subscriber<Object>() {
@Override
public void onCompleted() {
// Handle the completion
}
@Override
public void onError(Throwable e) {
// Handle the error
}
@Override
public void onNext(Object result) {
// Handle the result
}
};
Observable.create(onSubscribe)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
执行此操作的运算符是 merge
,请参阅 http://reactivex.io/documentation/operators/merge.html。
我的方法是创建两个可观察对象,比方说 observableLocal
和 observableRemote
,然后合并输出:
Observable<Object> observableLocal = Observable.create(...)
Observable<Object> observableRemote = Observable.create(...)
Observable.merge(observableLocal, observableRemote)
.subscribe(subscriber)
如果你想确保远程在本地之后运行,你可以使用concat
。
如果查询不相互依赖,Lukas Batteau 的回答是最好的。但是,如果您需要从本地 SQLite 查询 获取数据,然后 您 运行 远程查询(例如,您需要远程查询参数的数据或 headers) 那么你可以从本地 observable 开始,然后将其平面映射以组合两个 observables 在 你从本地查询中获取数据后:
Observable<Object> localObservable = Observable.create(...)
localObservable.flatMap(object ->
{
return Observable.zip(Observable.just(object), *create remote observable here*,
(localObservable, remoteObservable) ->
{
*combining function*
});
}).subscribe(subscriber);
flatmap 函数允许您通过 zip 函数将本地 observable 转换为本地和远程 observable 的组合。重申一下,这里的优点是两个可观察量是顺序的,并且 zip 函数只会在两个依赖可观察量 运行.
之后 运行此外,即使底层 objects 具有不同的类型,zip 函数也允许您组合可观察对象。在这种情况下,您提供一个组合函数作为第三个参数。如果底层数据是同一类型,则将zip函数替换为merge。
您可以尝试我的解决方案,有多种方法可以解决您的问题。
为了确保它正常工作,我创建了一个独立的工作示例并使用此 API 进行测试:https://jsonplaceholder.typicode.com/posts/1
private final Retrofit retrofit = new Retrofit.Builder()
.baseUrl("https://jsonplaceholder.typicode.com/posts/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build();
private final RestPostsService restPostsService = retrofit.create(RestPostsService.class);
private Observable<Posts> getPostById(int id) {
return restPostsService.getPostsById(id);
}
RestPostService.java
package app.com.rxretrofit;
import retrofit2.http.GET;
import retrofit2.http.Path;
import rx.Observable;
/**
* -> Created by Think-Twice-Code-Once on 11/26/2017.
*/
public interface RestPostsService {
@GET("{id}")
Observable<Posts> getPostsById(@Path("id") int id);
}
解决方案1: 顺序调用多个任务时使用,前面任务的结果总是输入下一个任务
getPostById(1)
.concatMap(posts1 -> {
//get post 1 success
return getPostById(posts1.getId() + 1);
})
.concatMap(posts2 -> {
//get post 2 success
return getPostById(posts2.getId() + 1);
})
.concatMap(posts3 -> {
//get post 3success
return getPostById(posts3.getId() + 1);
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(finalPosts -> {
//get post 4 success
Toast.makeText(this, "Final result: " + finalPosts.getId() + " - " + finalPosts.getTitle(),
Toast.LENGTH_LONG).show();
});
解决方案2: 顺序调用多个任务时使用,前一个任务的所有结果作为输入最终任务(例如:上传头像和封面图片后,调用 api 使用这些图片 URL 创建新用户):
Observable
.zip(getPostById(1), getPostById(2), getPostById(3), (posts1, posts2, posts3) -> {
//this method defines how to zip all separate results into one
return posts1.getId() + posts2.getId() + posts3.getId();
})
.flatMap(finalPostId -> {
//after get all first three posts, get the final posts,
// the final posts-id is sum of these posts-id
return getPostById(finalPostId);
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(finalPosts -> {
Toast.makeText(this, "Final posts: " + finalPosts.getId() + " - " + finalPosts.getTitle(),
Toast.LENGTH_SHORT).show();
});
AndroidManifest
<uses-permission android:name="android.permission.INTERNET"/>
root build.gradle
// Top-level build file where you can add configuration options common to all sub-projects/modules.
buildscript {
repositories {
jcenter()
}
dependencies {
classpath 'com.android.tools.build:gradle:2.3.3'
classpath 'me.tatarka:gradle-retrolambda:3.2.0'
classpath 'me.tatarka.retrolambda.projectlombok:lombok.ast:0.2.3.a2'
// NOTE: Do not place your application dependencies here; they belong
// in the individual module build.gradle files
}
// Exclude the version that the android plugin depends on.
configurations.classpath.exclude group: 'com.android.tools.external.lombok'
}
allprojects {
repositories {
jcenter()
}
}
task clean(type: Delete) {
delete rootProject.buildDir
}
app/build.gradle
apply plugin: 'me.tatarka.retrolambda'
apply plugin: 'com.android.application'
android {
compileSdkVersion 26
buildToolsVersion "26.0.1"
defaultConfig {
applicationId "app.com.rxretrofit"
minSdkVersion 15
targetSdkVersion 26
versionCode 1
versionName "1.0"
testInstrumentationRunner "android.support.test.runner.AndroidJUnitRunner"
}
buildTypes {
release {
minifyEnabled false
proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'
}
}
compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8
}
}
dependencies {
compile fileTree(dir: 'libs', include: ['*.jar'])
androidTestCompile('com.android.support.test.espresso:espresso-core:2.2.2', {
exclude group: 'com.android.support', module: 'support-annotations'
})
compile 'com.android.support:appcompat-v7:26.+'
compile 'com.android.support.constraint:constraint-layout:1.0.2'
testCompile 'junit:junit:4.12'
provided 'org.projectlombok:lombok:1.16.6'
compile 'com.squareup.retrofit2:retrofit:2.3.0'
compile 'com.squareup.retrofit2:converter-gson:2.3.0'
compile 'com.squareup.retrofit2:adapter-rxjava:2.3.0'
compile 'io.reactivex:rxandroid:1.2.1'
}
型号
package app.com.rxretrofit;
import com.google.gson.annotations.SerializedName;
/**
* -> Created by Think-Twice-Code-Once on 11/26/2017.
*/
public class Posts {
@SerializedName("userId")
private int userId;
@SerializedName("id")
private int id;
@SerializedName("title")
private String title;
@SerializedName("body")
private String body;
public int getUserId() {
return userId;
}
public void setUserId(int userId) {
this.userId = userId;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getBody() {
return body;
}
public void setBody(String body) {
this.body = body;
}
}
顺便说一下,使用 Rx + Retrofit + Dagger + MVP 模式 是一个很好的组合。