onNext() 方法只被调用一次
onNext() method is called only once
我正在尝试学习 RxJava 并正在实现一个简单的应用程序,我在其中从 JSONPlaceholder 加载 posts 的列表,并且对于每个 post 我正在加载评论调用另一个 api 并更新 posts 适配器。问题是 post observable 的 onNext() 方法只被调用一次,并且只有一个 posts 评论被加载。
这是我的 activity
public class MainActivity extends AppCompatActivity {
private static final String TAG = "MainActivity";
private RecyclerView rv;
private PostsAdapter adapter;
private CompositeDisposable disposables = new CompositeDisposable();
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
rv = findViewById(R.id.rv);
rv.setLayoutManager(new LinearLayoutManager(this,
RecyclerView.VERTICAL, false));
adapter = new PostsAdapter(this);
rv.setAdapter(adapter);
getPostObservable()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Function<Post, ObservableSource<Post>>() {
@Override
public ObservableSource<Post> apply(Post post) throws Exception {
return getCommentsObservable(post);
}
}).subscribe(new Observer<Post>() {
@Override
public void onSubscribe(Disposable d) {
disposables.add(d);
}
@Override
public void onNext(Post post) {
Log.d(TAG, "onNext: called");
updatePost(post);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ", e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: called");
}
});
}
private void updatePost(Post post) {
adapter.updatePost(post);
}
private Observable<Post> getPostObservable() {
return ApiClient.getApi()
.getPosts()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap((Function<List<Post>, ObservableSource<Post>>) posts -> {
adapter.setPosts(posts);
return Observable.fromIterable(posts)
.subscribeOn(Schedulers.io());
});
}
private Observable<Post> getCommentsObservable(final Post post) {
return ApiClient.getApi()
.getComments(post.getId())
.map(new Function<List<Comment>, Post>() {
@Override
public Post apply(List<Comment> comments) throws Exception {
int delay = (new Random().nextInt(5) + 1) * 1000;
Thread.sleep(delay);
post.setComments(comments);
return post;
}
})
.subscribeOn(Schedulers.io());
}
@Override
protected void onDestroy() {
super.onDestroy();
disposables.clear();
}
}
这是适配器
public class PostsAdapter extends RecyclerView.Adapter<PostsAdapter.ViewHolder> {
private final String TAG = "nexa_" + this.getClass().getSimpleName();
private Context context;
private List<Post> dataList = new ArrayList<>();
public PostsAdapter(Context context) {
this.context = context;
}
@NonNull
@Override
public ViewHolder onCreateViewHolder(@NonNull ViewGroup parent, int viewType) {
return new ViewHolder(
LayoutInflater.from(parent.getContext())
.inflate(R.layout.item_posts, parent, false)
);
}
public void setPosts(List<Post> posts) {
this.dataList = posts;
notifyDataSetChanged();
}
public void updatePost(Post post) {
dataList.set(dataList.indexOf(post), post);
notifyItemChanged(dataList.indexOf(post));
}
@Override
public void onBindViewHolder(@NonNull ViewHolder holder, int position) {
if (dataList.isEmpty())
return;
holder.bind(dataList.get(position));
}
@Override
public int getItemCount() {
return dataList.size();
}
public class ViewHolder extends RecyclerView.ViewHolder {
TextView tvNumber, tvDesc;
ProgressBar progressBar;
public ViewHolder(@NonNull View itemView) {
super(itemView);
tvNumber = itemView.findViewById(R.id.tv_number);
tvDesc = itemView.findViewById(R.id.tv_desc);
progressBar = itemView.findViewById(R.id.progressbar);
}
public void bind(Post post) {
tvDesc.setText(post.getBody());
if (post.getComments() == null) {
toggleProgressbar(true);
tvNumber.setText("");
} else {
toggleProgressbar(false);
tvNumber.setText(String.valueOf(post.getComments().size()));
}
}
void toggleProgressbar(boolean show) {
if (show) {
progressBar.setVisibility(View.VISIBLE);
} else {
progressBar.setVisibility(View.GONE);
}
}
}
}
ApiClient class
public class ApiClient {
public static final String BASE_URL = "https://jsonplaceholder.typicode.com/";
private static Retrofit retrofit = null;
public static ApiInterface getApi() {
ConnectionSpec spec = new ConnectionSpec.Builder(ConnectionSpec.COMPATIBLE_TLS)
.tlsVersions(TlsVersion.TLS_1_2, TlsVersion.TLS_1_1, TlsVersion.TLS_1_0)
.cipherSuites(
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
CipherSuite.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256,
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA)
.build();
final OkHttpClient okHttpClient = new OkHttpClient.Builder()
.connectionSpecs(Collections.singletonList(spec))
.connectTimeout(60, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.addInterceptor(new Interceptor() {
@Override
public Response intercept(Chain chain) throws IOException {
Request original = chain.request();
// Request customization: add request headers
Request.Builder requestBuilder = original.newBuilder()
.header("Authorization", "");
Request request = requestBuilder.build();
return chain.proceed(request);
}
})
.build();
if (retrofit == null) {
retrofit = new Retrofit.Builder()
.baseUrl(BASE_URL)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.client(okHttpClient)
.build();
}
return retrofit.create(ApiInterface.class);
}
}
Api接口class
public interface ApiInterface {
@GET("posts")
Observable<List<Post>> getPosts();
@GET("posts/{id}/comments")
Observable<List<Comment>> getComments(
@Path("id") int id
);
}
问题是我正在调用 getCommentsObservable(),这是来自后台线程的网络调用。我添加了 observeOn(AndroidSchedulers.mainThread()) 并且它现在正在工作。
private Observable<Post> getCommentsObservable(final Post post) {
return ApiClient.getApi()
.getComments(post.getId())
.map(new Function<List<Comment>, Post>() {
@Override
public Post apply(List<Comment> comments) throws Exception {
int delay = (new Random().nextInt(5) + 1) * 1000;
Thread.sleep(delay);
post.setComments(comments);
return post;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()); // ADDED THIS LINE
}
我正在尝试学习 RxJava 并正在实现一个简单的应用程序,我在其中从 JSONPlaceholder 加载 posts 的列表,并且对于每个 post 我正在加载评论调用另一个 api 并更新 posts 适配器。问题是 post observable 的 onNext() 方法只被调用一次,并且只有一个 posts 评论被加载。
这是我的 activity
public class MainActivity extends AppCompatActivity {
private static final String TAG = "MainActivity";
private RecyclerView rv;
private PostsAdapter adapter;
private CompositeDisposable disposables = new CompositeDisposable();
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
rv = findViewById(R.id.rv);
rv.setLayoutManager(new LinearLayoutManager(this,
RecyclerView.VERTICAL, false));
adapter = new PostsAdapter(this);
rv.setAdapter(adapter);
getPostObservable()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Function<Post, ObservableSource<Post>>() {
@Override
public ObservableSource<Post> apply(Post post) throws Exception {
return getCommentsObservable(post);
}
}).subscribe(new Observer<Post>() {
@Override
public void onSubscribe(Disposable d) {
disposables.add(d);
}
@Override
public void onNext(Post post) {
Log.d(TAG, "onNext: called");
updatePost(post);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ", e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: called");
}
});
}
private void updatePost(Post post) {
adapter.updatePost(post);
}
private Observable<Post> getPostObservable() {
return ApiClient.getApi()
.getPosts()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap((Function<List<Post>, ObservableSource<Post>>) posts -> {
adapter.setPosts(posts);
return Observable.fromIterable(posts)
.subscribeOn(Schedulers.io());
});
}
private Observable<Post> getCommentsObservable(final Post post) {
return ApiClient.getApi()
.getComments(post.getId())
.map(new Function<List<Comment>, Post>() {
@Override
public Post apply(List<Comment> comments) throws Exception {
int delay = (new Random().nextInt(5) + 1) * 1000;
Thread.sleep(delay);
post.setComments(comments);
return post;
}
})
.subscribeOn(Schedulers.io());
}
@Override
protected void onDestroy() {
super.onDestroy();
disposables.clear();
}
}
这是适配器
public class PostsAdapter extends RecyclerView.Adapter<PostsAdapter.ViewHolder> {
private final String TAG = "nexa_" + this.getClass().getSimpleName();
private Context context;
private List<Post> dataList = new ArrayList<>();
public PostsAdapter(Context context) {
this.context = context;
}
@NonNull
@Override
public ViewHolder onCreateViewHolder(@NonNull ViewGroup parent, int viewType) {
return new ViewHolder(
LayoutInflater.from(parent.getContext())
.inflate(R.layout.item_posts, parent, false)
);
}
public void setPosts(List<Post> posts) {
this.dataList = posts;
notifyDataSetChanged();
}
public void updatePost(Post post) {
dataList.set(dataList.indexOf(post), post);
notifyItemChanged(dataList.indexOf(post));
}
@Override
public void onBindViewHolder(@NonNull ViewHolder holder, int position) {
if (dataList.isEmpty())
return;
holder.bind(dataList.get(position));
}
@Override
public int getItemCount() {
return dataList.size();
}
public class ViewHolder extends RecyclerView.ViewHolder {
TextView tvNumber, tvDesc;
ProgressBar progressBar;
public ViewHolder(@NonNull View itemView) {
super(itemView);
tvNumber = itemView.findViewById(R.id.tv_number);
tvDesc = itemView.findViewById(R.id.tv_desc);
progressBar = itemView.findViewById(R.id.progressbar);
}
public void bind(Post post) {
tvDesc.setText(post.getBody());
if (post.getComments() == null) {
toggleProgressbar(true);
tvNumber.setText("");
} else {
toggleProgressbar(false);
tvNumber.setText(String.valueOf(post.getComments().size()));
}
}
void toggleProgressbar(boolean show) {
if (show) {
progressBar.setVisibility(View.VISIBLE);
} else {
progressBar.setVisibility(View.GONE);
}
}
}
}
ApiClient class
public class ApiClient {
public static final String BASE_URL = "https://jsonplaceholder.typicode.com/";
private static Retrofit retrofit = null;
public static ApiInterface getApi() {
ConnectionSpec spec = new ConnectionSpec.Builder(ConnectionSpec.COMPATIBLE_TLS)
.tlsVersions(TlsVersion.TLS_1_2, TlsVersion.TLS_1_1, TlsVersion.TLS_1_0)
.cipherSuites(
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
CipherSuite.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256,
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA)
.build();
final OkHttpClient okHttpClient = new OkHttpClient.Builder()
.connectionSpecs(Collections.singletonList(spec))
.connectTimeout(60, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.addInterceptor(new Interceptor() {
@Override
public Response intercept(Chain chain) throws IOException {
Request original = chain.request();
// Request customization: add request headers
Request.Builder requestBuilder = original.newBuilder()
.header("Authorization", "");
Request request = requestBuilder.build();
return chain.proceed(request);
}
})
.build();
if (retrofit == null) {
retrofit = new Retrofit.Builder()
.baseUrl(BASE_URL)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.client(okHttpClient)
.build();
}
return retrofit.create(ApiInterface.class);
}
}
Api接口class
public interface ApiInterface {
@GET("posts")
Observable<List<Post>> getPosts();
@GET("posts/{id}/comments")
Observable<List<Comment>> getComments(
@Path("id") int id
);
}
问题是我正在调用 getCommentsObservable(),这是来自后台线程的网络调用。我添加了 observeOn(AndroidSchedulers.mainThread()) 并且它现在正在工作。
private Observable<Post> getCommentsObservable(final Post post) {
return ApiClient.getApi()
.getComments(post.getId())
.map(new Function<List<Comment>, Post>() {
@Override
public Post apply(List<Comment> comments) throws Exception {
int delay = (new Random().nextInt(5) + 1) * 1000;
Thread.sleep(delay);
post.setComments(comments);
return post;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()); // ADDED THIS LINE
}