RXJava Android 进行多个不同长度的调用

RXJava Android make multiple calls with different lengths

我需要从服务器获取摄像头ID,然后通过ID获取该摄像头的事件,最后通过ID获取图像。 然后,所有这些都需要组合成一个UIpost{相机名称-图片url-it事件} 例如,我们可能有 5 个摄像机名称、30 个事件 ID(一个摄像机名称为 6 个)和 30 url。 我尝试使用 zip 组合 Observable api 调用(分别)获取相机名称、事件 ID 和图像 url。但据我所知,由于 zip 无法正确匹配 5 个名称和 30 个 ID,因此它无法正常工作 告诉我这有什么用?

 GetApiMethods getApiMethods = NetworkService.getInstance().createService(GetApiMethods.class);
    Observable<Camera> cameras = getCamera();
    Observable<Event> events = getEvents();
    Observable<Post> combined = Observable.zip(cameras, events, (camera, event) -> new Post(camera.getDisplayName(),"null",event.getType()));


    public LiveData<String> CameraRequests() {
        combined.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .concatMap(Observable::fromArray)
                .subscribe(new Observer<Post>() {

                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Post post) {
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

        return liveData;
    }

    public Observable<Event> getEvents() {
        return Observable.create(allEvents -> getApiMethods.getCameras()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .concatMap(Observable::fromIterable)
                .concatMap(Observable::fromIterable)
                .concatMap(camera -> getApiMethods.getEvents(camera.getAccessPoint().replace("hosts", "")))
                .concatMap(Observable::fromIterable)
                .map(Event::getId)
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull String event) {
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        Log.d("RXJAVA1", String.valueOf(e));
                    }

                    @Override
                    public void onComplete() {

                    }
                })
        );

    }



    public Observable<Camera> getCamera() {

        return Observable.create(cameras1 -> getApiMethods.getCameras()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .concatMap(Observable::fromIterable)
                .map(Camera::getDisplayName)
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull String s) {

                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        Log.d("RXJAVA", String.valueOf(e));
                    }

                    @Override
                    public void onComplete() {
                        

                    }
                })

        );

    }

如果使用 flatMap:

Observable<Post> getPostsObservable() {
        return getCameras().flatMap(camera ->
                // for each camera, build observable to get events
                getEvents(camera.accessPoint).flatMap(event ->
                        // for each event, build observable to get imageUrl
                        getImageUrl(event.id).map(imageUrl ->
                                // combine camera, event and imageUrl in current rx-chain
                                new Post(camera, event, imageUrl)
                        )
                )
        );
    }

您可能需要稍微调整一下代码,例如简化 getCamerasgetEvents 方法。您可以查看下面模仿您的应用程序的演示,以更好地了解它的工作原理以及需要返工的内容。如果调用 getPosts 方法,您应该会在 logcat 中看到正在正确创建和接收帖子。

import android.util.Log;

import androidx.annotation.NonNull;

import java.util.ArrayList;
import java.util.List;

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

public class RxDemo {
    // your Camera class
    public class Camera {
        private final String accessPoint;

        public Camera(String accessPoint) {
            this.accessPoint = accessPoint;
        }

        public String getAccessPoint() {
            return accessPoint;
        }
    }

    // your Event class
    public class Event {
        private final String id;

        public Event(String id) {
            this.id = id;
        }

        public String getId() {
            return id;
        }
    }

    // your Post class
    public class Post {
        private final Camera camera;
        private final Event event;
        private final String imageUrl;

        public Post(Camera camera, Event event, String imageUrl) {
            this.camera = camera;
            this.event = event;
            this.imageUrl = imageUrl;
        }

        public Camera getCamera() {
            return camera;
        }

        public Event getEvent() {
            return event;
        }

        public String getImageUrl() {
            return imageUrl;
        }

        @NonNull
        @Override
        public String toString() {
            return "camera: " + camera.accessPoint + "; event: " + event.id + "; imageUrl: " + imageUrl;
        }
    }

    // mocked getApiMethods.getCameras(), return 5 mocked cameras
    private Observable<List<Camera>> apiGetCameras() {
        List<Camera> cameras = new ArrayList<>();
        cameras.add(new Camera("cam_1"));
        cameras.add(new Camera("cam_2"));
        cameras.add(new Camera("cam_3"));
        cameras.add(new Camera("cam_4"));
        cameras.add(new Camera("cam_5"));
        return Observable.just(cameras);
    }

    // mocked getApiMethods.getEvents, return 6 mocked events with cameraId postFix
    private Observable<List<Event>> apiGetEvents(String cameraId) {
        List<Event> events = new ArrayList<>();
        events.add(new Event("event_1_" + cameraId));
        events.add(new Event("event_2_" + cameraId));
        events.add(new Event("event_3_" + cameraId));
        events.add(new Event("event_4_" + cameraId));
        events.add(new Event("event_5_" + cameraId));
        events.add(new Event("event_6_" + cameraId));
        return Observable.just(events);
    }

    // mocked getApiMethods.getImageUrl
    private Observable<String> apiGetImageUrl(String eventId) {
        return Observable.just("http://my.domain.com/" + eventId + ".png");
    }

    public Observable<Camera> getCameras() {
        return apiGetCameras().concatMapIterable(cam -> cam);
    }

    public Observable<Event> getEvents(String cameraId) {
        return apiGetEvents(cameraId).concatMapIterable(event -> event);
    }

    // build observable that emits posts
    private Observable<Post> getPostsObservable() {
        return getCameras().flatMap(camera ->
                // for each camera, build observable to get events
                getEvents(camera.accessPoint).flatMap(event ->
                        // for each event, build observable to get imageUrl
                        apiGetImageUrl(event.id).map(imageUrl ->
                                // combine camera, event and imageUrl in current rx-chain
                                new Post(camera, event, imageUrl)
                        )
                )
        );
    }

    // call this in your activity/fragment/viewModel to get the posts
    public void getPosts() {
        getPostsObservable()
                .subscribeOn(Schedulers.io())
                .subscribe(new Observer<Post>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Post post) {
                        Log.d("AAAA", post.toString());
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }
}