onNext() 方法只被调用一次

onNext() method is called only once

我正在尝试学习 RxJava 并正在实现一个简单的应用程序,我在其中从 JSONPlaceholder 加载 posts 的列表,并且对于每个 post 我正在加载评论调用另一个 api 并更新 posts 适配器。问题是 post observable 的 onNext() 方法只被调用一次,并且只有一个 posts 评论被加载。

这是我的 activity

public class MainActivity extends AppCompatActivity {

    private static final String TAG = "MainActivity";

    private RecyclerView rv;
    private PostsAdapter adapter;

    private CompositeDisposable disposables = new CompositeDisposable();

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        rv = findViewById(R.id.rv);
        rv.setLayoutManager(new LinearLayoutManager(this,
                RecyclerView.VERTICAL, false));
        adapter = new PostsAdapter(this);
        rv.setAdapter(adapter);

        getPostObservable()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .flatMap(new Function<Post, ObservableSource<Post>>() {
                    @Override
                    public ObservableSource<Post> apply(Post post) throws Exception {
                        return getCommentsObservable(post);
                    }
                }).subscribe(new Observer<Post>() {
            @Override
            public void onSubscribe(Disposable d) {
                disposables.add(d);
            }

            @Override
            public void onNext(Post post) {
                Log.d(TAG, "onNext: called");
                updatePost(post);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ", e);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: called");
            }
        });

    }

    private void updatePost(Post post) {
        adapter.updatePost(post);
    }

    private Observable<Post> getPostObservable() {
        return ApiClient.getApi()
                .getPosts()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .flatMap((Function<List<Post>, ObservableSource<Post>>) posts -> {
                    adapter.setPosts(posts);
                    return Observable.fromIterable(posts)
                            .subscribeOn(Schedulers.io());
                });
    }

    private Observable<Post> getCommentsObservable(final Post post) {
        return ApiClient.getApi()
                .getComments(post.getId())
                .map(new Function<List<Comment>, Post>() {
                    @Override
                    public Post apply(List<Comment> comments) throws Exception {
                        int delay = (new Random().nextInt(5) + 1) * 1000;
                        Thread.sleep(delay);
                        post.setComments(comments);
                        return post;
                    }
                })
                .subscribeOn(Schedulers.io());
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposables.clear();
    }
}

这是适配器

public class PostsAdapter extends RecyclerView.Adapter<PostsAdapter.ViewHolder> {

    private final String TAG = "nexa_" + this.getClass().getSimpleName();

    private Context context;
    private List<Post> dataList = new ArrayList<>();

    public PostsAdapter(Context context) {
        this.context = context;
    }

    @NonNull
    @Override
    public ViewHolder onCreateViewHolder(@NonNull ViewGroup parent, int viewType) {
        return new ViewHolder(
                LayoutInflater.from(parent.getContext())
                        .inflate(R.layout.item_posts, parent, false)
        );
    }

    public void setPosts(List<Post> posts) {
        this.dataList = posts;
        notifyDataSetChanged();
    }

    public void updatePost(Post post) {
        dataList.set(dataList.indexOf(post), post);
        notifyItemChanged(dataList.indexOf(post));
    }

    @Override
    public void onBindViewHolder(@NonNull ViewHolder holder, int position) {

        if (dataList.isEmpty())
            return;
        holder.bind(dataList.get(position));
    }

    @Override
    public int getItemCount() {
        return dataList.size();
    }

    public class ViewHolder extends RecyclerView.ViewHolder {

        TextView tvNumber, tvDesc;
        ProgressBar progressBar;

        public ViewHolder(@NonNull View itemView) {
            super(itemView);

            tvNumber = itemView.findViewById(R.id.tv_number);
            tvDesc = itemView.findViewById(R.id.tv_desc);
            progressBar = itemView.findViewById(R.id.progressbar);
        }

        public void bind(Post post) {
            tvDesc.setText(post.getBody());

            if (post.getComments() == null) {
                toggleProgressbar(true);
                tvNumber.setText("");
            } else {
                toggleProgressbar(false);
                tvNumber.setText(String.valueOf(post.getComments().size()));
            }


        }

        void toggleProgressbar(boolean show) {
            if (show) {
                progressBar.setVisibility(View.VISIBLE);
            } else {
                progressBar.setVisibility(View.GONE);
            }
        }
    }
}

ApiClient class

public class ApiClient {

    public static final String BASE_URL = "https://jsonplaceholder.typicode.com/";
    private static Retrofit retrofit = null;

    public static ApiInterface getApi() {

        ConnectionSpec spec = new ConnectionSpec.Builder(ConnectionSpec.COMPATIBLE_TLS)
                .tlsVersions(TlsVersion.TLS_1_2, TlsVersion.TLS_1_1, TlsVersion.TLS_1_0)
                .cipherSuites(
                        CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
                        CipherSuite.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256,
                        CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
                        CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA)
                .build();

        final OkHttpClient okHttpClient = new OkHttpClient.Builder()
                .connectionSpecs(Collections.singletonList(spec))
                .connectTimeout(60, TimeUnit.SECONDS)
                .writeTimeout(30, TimeUnit.SECONDS)
                .readTimeout(30, TimeUnit.SECONDS)
                .addInterceptor(new Interceptor() {
                    @Override
                    public Response intercept(Chain chain) throws IOException {
                        Request original = chain.request();

                        // Request customization: add request headers
                        Request.Builder requestBuilder = original.newBuilder()
                                .header("Authorization", "");

                        Request request = requestBuilder.build();
                        return chain.proceed(request);
                    }
                })
                .build();

        if (retrofit == null) {
            retrofit = new Retrofit.Builder()
                    .baseUrl(BASE_URL)
                    .addConverterFactory(GsonConverterFactory.create())
                    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                    .client(okHttpClient)
                    .build();
        }
        return retrofit.create(ApiInterface.class);
    }
}

Api接口class

public interface ApiInterface {

    @GET("posts")
    Observable<List<Post>> getPosts();

    @GET("posts/{id}/comments")
    Observable<List<Comment>> getComments(
            @Path("id") int id
    );
}

问题是我正在调用 getCommentsObservable(),这是来自后台线程的网络调用。我添加了 observeOn(AndroidSchedulers.mainThread()) 并且它现在正在工作。

private Observable<Post> getCommentsObservable(final Post post) {
        return ApiClient.getApi()
                .getComments(post.getId())
                .map(new Function<List<Comment>, Post>() {
                    @Override
                    public Post apply(List<Comment> comments) throws Exception {
                        int delay = (new Random().nextInt(5) + 1) * 1000;
                        Thread.sleep(delay);
                        post.setComments(comments);
                        return post;
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread()); // ADDED THIS LINE
    }