RXJava Android 进行多个不同长度的调用
RXJava Android make multiple calls with different lengths
我需要从服务器获取摄像头ID,然后通过ID获取该摄像头的事件,最后通过ID获取图像。
然后,所有这些都需要组合成一个UIpost{相机名称-图片url-it事件}
例如,我们可能有 5 个摄像机名称、30 个事件 ID(一个摄像机名称为 6 个)和 30 url。
我尝试使用 zip 组合 Observable api 调用(分别)获取相机名称、事件 ID 和图像 url。但据我所知,由于 zip 无法正确匹配 5 个名称和 30 个 ID,因此它无法正常工作
告诉我这有什么用?
GetApiMethods getApiMethods = NetworkService.getInstance().createService(GetApiMethods.class);
Observable<Camera> cameras = getCamera();
Observable<Event> events = getEvents();
Observable<Post> combined = Observable.zip(cameras, events, (camera, event) -> new Post(camera.getDisplayName(),"null",event.getType()));
public LiveData<String> CameraRequests() {
combined.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.concatMap(Observable::fromArray)
.subscribe(new Observer<Post>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Post post) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
return liveData;
}
public Observable<Event> getEvents() {
return Observable.create(allEvents -> getApiMethods.getCameras()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.concatMap(Observable::fromIterable)
.concatMap(Observable::fromIterable)
.concatMap(camera -> getApiMethods.getEvents(camera.getAccessPoint().replace("hosts", "")))
.concatMap(Observable::fromIterable)
.map(Event::getId)
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String event) {
}
@Override
public void onError(@NonNull Throwable e) {
Log.d("RXJAVA1", String.valueOf(e));
}
@Override
public void onComplete() {
}
})
);
}
public Observable<Camera> getCamera() {
return Observable.create(cameras1 -> getApiMethods.getCameras()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.concatMap(Observable::fromIterable)
.map(Camera::getDisplayName)
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
}
@Override
public void onError(@NonNull Throwable e) {
Log.d("RXJAVA", String.valueOf(e));
}
@Override
public void onComplete() {
}
})
);
}
如果使用 flatMap
:
Observable<Post> getPostsObservable() {
return getCameras().flatMap(camera ->
// for each camera, build observable to get events
getEvents(camera.accessPoint).flatMap(event ->
// for each event, build observable to get imageUrl
getImageUrl(event.id).map(imageUrl ->
// combine camera, event and imageUrl in current rx-chain
new Post(camera, event, imageUrl)
)
)
);
}
您可能需要稍微调整一下代码,例如简化 getCameras
和 getEvents
方法。您可以查看下面模仿您的应用程序的演示,以更好地了解它的工作原理以及需要返工的内容。如果调用 getPosts
方法,您应该会在 logcat 中看到正在正确创建和接收帖子。
import android.util.Log;
import androidx.annotation.NonNull;
import java.util.ArrayList;
import java.util.List;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
public class RxDemo {
// your Camera class
public class Camera {
private final String accessPoint;
public Camera(String accessPoint) {
this.accessPoint = accessPoint;
}
public String getAccessPoint() {
return accessPoint;
}
}
// your Event class
public class Event {
private final String id;
public Event(String id) {
this.id = id;
}
public String getId() {
return id;
}
}
// your Post class
public class Post {
private final Camera camera;
private final Event event;
private final String imageUrl;
public Post(Camera camera, Event event, String imageUrl) {
this.camera = camera;
this.event = event;
this.imageUrl = imageUrl;
}
public Camera getCamera() {
return camera;
}
public Event getEvent() {
return event;
}
public String getImageUrl() {
return imageUrl;
}
@NonNull
@Override
public String toString() {
return "camera: " + camera.accessPoint + "; event: " + event.id + "; imageUrl: " + imageUrl;
}
}
// mocked getApiMethods.getCameras(), return 5 mocked cameras
private Observable<List<Camera>> apiGetCameras() {
List<Camera> cameras = new ArrayList<>();
cameras.add(new Camera("cam_1"));
cameras.add(new Camera("cam_2"));
cameras.add(new Camera("cam_3"));
cameras.add(new Camera("cam_4"));
cameras.add(new Camera("cam_5"));
return Observable.just(cameras);
}
// mocked getApiMethods.getEvents, return 6 mocked events with cameraId postFix
private Observable<List<Event>> apiGetEvents(String cameraId) {
List<Event> events = new ArrayList<>();
events.add(new Event("event_1_" + cameraId));
events.add(new Event("event_2_" + cameraId));
events.add(new Event("event_3_" + cameraId));
events.add(new Event("event_4_" + cameraId));
events.add(new Event("event_5_" + cameraId));
events.add(new Event("event_6_" + cameraId));
return Observable.just(events);
}
// mocked getApiMethods.getImageUrl
private Observable<String> apiGetImageUrl(String eventId) {
return Observable.just("http://my.domain.com/" + eventId + ".png");
}
public Observable<Camera> getCameras() {
return apiGetCameras().concatMapIterable(cam -> cam);
}
public Observable<Event> getEvents(String cameraId) {
return apiGetEvents(cameraId).concatMapIterable(event -> event);
}
// build observable that emits posts
private Observable<Post> getPostsObservable() {
return getCameras().flatMap(camera ->
// for each camera, build observable to get events
getEvents(camera.accessPoint).flatMap(event ->
// for each event, build observable to get imageUrl
apiGetImageUrl(event.id).map(imageUrl ->
// combine camera, event and imageUrl in current rx-chain
new Post(camera, event, imageUrl)
)
)
);
}
// call this in your activity/fragment/viewModel to get the posts
public void getPosts() {
getPostsObservable()
.subscribeOn(Schedulers.io())
.subscribe(new Observer<Post>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Post post) {
Log.d("AAAA", post.toString());
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
}
}
我需要从服务器获取摄像头ID,然后通过ID获取该摄像头的事件,最后通过ID获取图像。 然后,所有这些都需要组合成一个UIpost{相机名称-图片url-it事件} 例如,我们可能有 5 个摄像机名称、30 个事件 ID(一个摄像机名称为 6 个)和 30 url。 我尝试使用 zip 组合 Observable api 调用(分别)获取相机名称、事件 ID 和图像 url。但据我所知,由于 zip 无法正确匹配 5 个名称和 30 个 ID,因此它无法正常工作 告诉我这有什么用?
GetApiMethods getApiMethods = NetworkService.getInstance().createService(GetApiMethods.class);
Observable<Camera> cameras = getCamera();
Observable<Event> events = getEvents();
Observable<Post> combined = Observable.zip(cameras, events, (camera, event) -> new Post(camera.getDisplayName(),"null",event.getType()));
public LiveData<String> CameraRequests() {
combined.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.concatMap(Observable::fromArray)
.subscribe(new Observer<Post>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Post post) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
return liveData;
}
public Observable<Event> getEvents() {
return Observable.create(allEvents -> getApiMethods.getCameras()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.concatMap(Observable::fromIterable)
.concatMap(Observable::fromIterable)
.concatMap(camera -> getApiMethods.getEvents(camera.getAccessPoint().replace("hosts", "")))
.concatMap(Observable::fromIterable)
.map(Event::getId)
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String event) {
}
@Override
public void onError(@NonNull Throwable e) {
Log.d("RXJAVA1", String.valueOf(e));
}
@Override
public void onComplete() {
}
})
);
}
public Observable<Camera> getCamera() {
return Observable.create(cameras1 -> getApiMethods.getCameras()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.concatMap(Observable::fromIterable)
.map(Camera::getDisplayName)
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
}
@Override
public void onError(@NonNull Throwable e) {
Log.d("RXJAVA", String.valueOf(e));
}
@Override
public void onComplete() {
}
})
);
}
如果使用 flatMap
:
Observable<Post> getPostsObservable() {
return getCameras().flatMap(camera ->
// for each camera, build observable to get events
getEvents(camera.accessPoint).flatMap(event ->
// for each event, build observable to get imageUrl
getImageUrl(event.id).map(imageUrl ->
// combine camera, event and imageUrl in current rx-chain
new Post(camera, event, imageUrl)
)
)
);
}
您可能需要稍微调整一下代码,例如简化 getCameras
和 getEvents
方法。您可以查看下面模仿您的应用程序的演示,以更好地了解它的工作原理以及需要返工的内容。如果调用 getPosts
方法,您应该会在 logcat 中看到正在正确创建和接收帖子。
import android.util.Log;
import androidx.annotation.NonNull;
import java.util.ArrayList;
import java.util.List;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
public class RxDemo {
// your Camera class
public class Camera {
private final String accessPoint;
public Camera(String accessPoint) {
this.accessPoint = accessPoint;
}
public String getAccessPoint() {
return accessPoint;
}
}
// your Event class
public class Event {
private final String id;
public Event(String id) {
this.id = id;
}
public String getId() {
return id;
}
}
// your Post class
public class Post {
private final Camera camera;
private final Event event;
private final String imageUrl;
public Post(Camera camera, Event event, String imageUrl) {
this.camera = camera;
this.event = event;
this.imageUrl = imageUrl;
}
public Camera getCamera() {
return camera;
}
public Event getEvent() {
return event;
}
public String getImageUrl() {
return imageUrl;
}
@NonNull
@Override
public String toString() {
return "camera: " + camera.accessPoint + "; event: " + event.id + "; imageUrl: " + imageUrl;
}
}
// mocked getApiMethods.getCameras(), return 5 mocked cameras
private Observable<List<Camera>> apiGetCameras() {
List<Camera> cameras = new ArrayList<>();
cameras.add(new Camera("cam_1"));
cameras.add(new Camera("cam_2"));
cameras.add(new Camera("cam_3"));
cameras.add(new Camera("cam_4"));
cameras.add(new Camera("cam_5"));
return Observable.just(cameras);
}
// mocked getApiMethods.getEvents, return 6 mocked events with cameraId postFix
private Observable<List<Event>> apiGetEvents(String cameraId) {
List<Event> events = new ArrayList<>();
events.add(new Event("event_1_" + cameraId));
events.add(new Event("event_2_" + cameraId));
events.add(new Event("event_3_" + cameraId));
events.add(new Event("event_4_" + cameraId));
events.add(new Event("event_5_" + cameraId));
events.add(new Event("event_6_" + cameraId));
return Observable.just(events);
}
// mocked getApiMethods.getImageUrl
private Observable<String> apiGetImageUrl(String eventId) {
return Observable.just("http://my.domain.com/" + eventId + ".png");
}
public Observable<Camera> getCameras() {
return apiGetCameras().concatMapIterable(cam -> cam);
}
public Observable<Event> getEvents(String cameraId) {
return apiGetEvents(cameraId).concatMapIterable(event -> event);
}
// build observable that emits posts
private Observable<Post> getPostsObservable() {
return getCameras().flatMap(camera ->
// for each camera, build observable to get events
getEvents(camera.accessPoint).flatMap(event ->
// for each event, build observable to get imageUrl
apiGetImageUrl(event.id).map(imageUrl ->
// combine camera, event and imageUrl in current rx-chain
new Post(camera, event, imageUrl)
)
)
);
}
// call this in your activity/fragment/viewModel to get the posts
public void getPosts() {
getPostsObservable()
.subscribeOn(Schedulers.io())
.subscribe(new Observer<Post>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Post post) {
Log.d("AAAA", post.toString());
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
}
}