如何在subscribe 方法中将Observable.fromIterable 中的item 索引传递给onNext?

How to pass the item indexes in Observable.fromIterable to onNext in subscribe method?

我正在尝试使用 RxJava2 加载数据并将其放入 SparseArray。我通过从数组中调用 URL 来获取数据,但我需要按照数组中 URL 的顺序解析响应并将其插入到 SparseArray 中,因此我需要传递索引mUrls.getGroups()

中的字符串项

提前致谢!

@GET
Single<ResponseBody> getChannels(@Url String url);


groups = new SparseArray<>();


Observable.fromIterable(mUrls.getGroups())
    .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        //
        // How can I access the index of the String item in the array?
        //
        .subscribe(new Observer<ResponseBody>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(ResponseBody responseBody) {
                Group group = GroupParser.parseList(responseBody.byteStream(), index);


                groups.put(index, group);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, e.getMessage());
            }

            @Override
            public void onComplete() {

            }
 });

编辑:

这是已实施的解决方案:

 Observable.defer(() -> {
                        AtomicInteger counter = new AtomicInteger();
                        return Observable.fromIterable(mUrls.getGroups())
                                .map(url -> new Pair(url, counter.getAndIncrement()));
                    }).flatMapSingle(pair ->
                            aPI.getChannels(pair.first.toString())
                                    .map(responseBody -> new Pair(responseBody, pair.second))
                                    .subscribeOn(Schedulers.io())
                    )
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<Pair>() {
                        @Override
                        public void onSubscribe(Disposable d) {

                        }

                        @Override
                        public void onNext(Pair pair) {
                            Pair<ResponseBody, Integer> resultPair =  (Pair<ResponseBody, Integer>) pair;
                            Group group = GroupParser.parseList(resultPair.first.byteStream(),
                                    resultPair.second);

                            groups.put(resultPair.second, group);
                        }

                        @Override
                        public void onError(Throwable e) {
                            Log.e(TAG, "***** message: " + e.getMessage());
                        }

                        @Override
                        public void onComplete() {
                            Log.i(TAG, "***** onComplete.");
                        }
                    });

如果您按顺序处理 URL,您可以在 Observer 中引入一个 index 字段:

Observable.fromIterable(mUrls.getGroups())
    .concatMapSingle(url -> getChannels(url))
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<ResponseBody>() {

         int index;  // <----------------------------------------------------

         // ...

         @Override
         public void onNext(ResponseBody responseBody) {
             Group group = GroupParser.parseList(responseBody.byteStream(), index);


             groups.put(index, group);

             index++;  // <---------------------------------------------------------
         }

         // ...
    });

但是,如果您同时处理 URL,则必须将每个 URL 与索引配对并对其进行标记。例如,给定 Pair class:

 Observable.defer(() -> {
     AtomicInteger counter = new AtomicInteger();
     return Observable.fromIterable(mUrls.getGroups())
     .map(url -> Pair.of(url, counter.getAndIncremenet()));
 })
 .flatMapSingle(urlIndex -> 
     getChannels(urlIndex.first)
     .map(v -> Pair.of(v, urlIndex.second))
     .subscribeOn(Schedulers.io())
 )
 .observeOn(AndroidSchedulers.mainThread())
 .subscribe(new Observer<Pair<ResponseBody, Integer>>() {

         // ...

         @Override
         public void onNext(Pair<ResponseBody, Integer> pair) {
             Group group = GroupParser.parseList(pair.first.byteStream(), pair.second);


             groups.put(pair.second, group);
         }

         // ...
    });