RxJava - 似乎永远阻塞的调试链

RxJava - Debug chain that seems to block forever

我是运行以下代码:

List<GroupedObservable<BcxToDoList, BcxToDo>> mToDoList;

mToDoList = bcxClient
  .fetchToDos()
  .flatMap(new Func1<List<BcxToDo>, Observable<BcxToDo>>() {
    @Override
    public Observable<BcxToDo> call(List<BcxToDo> bcxToDos) {
      return Observable.from(bcxToDos);
    }
  })
  .groupBy(new Func1<BcxToDo, BcxToDoList>() {
    @Override
    public BcxToDoList call(BcxToDo bcxToDo) {
      return bcxToDo.toDoList;
    }
  })
  .toList()
  .toBlocking()
  .single();

当我在 Android Studio 中进入这段代码时,代码无限期地阻塞。如果我使用 subscribe() 来捕获它,没有例外。

调试正在发生的事情的最佳方法是什么?

更新

按照@dwursteisen 的建议,我使用.doOnNext() 来查看.groupBy() 发出了什么。它正在创建我预期的输出,只是从未发送过 onCompleted 通知。

根据 these tickets,在 RxJava 中这是设计使然。为了使 toList 运算符起作用,必须处理每个 GroupedObservable。

这是我修改后的代码:

List<BcxToDoList> mToDoList;

mToDoList = bcxClient
  .fetchToDos()
  .flatMap(new Func1<List<BcxToDo>, Observable<BcxToDo>>() {
    @Override
    public Observable<BcxToDo> call(List<BcxToDo> bcxToDos) {
      return Observable.from(bcxToDos);
    }
  })
  .groupBy(new Func1<BcxToDo, BcxToDoList>() {
    @Override
    public BcxToDoList call(BcxToDo bcxToDo) {
      return bcxToDo.toDoList;
    }
  })
  .flatMap( new Func1<GroupedObservable<BcxToDoList, BcxToDo>, Observable<BcxToDoList>>() {
    @Override
    public Observable<BcxToDoList> call(final GroupedObservable<BcxToDoList, BcxToDo> bcxToDoListBcxToDoGroupedObservable) {
      return bcxToDoListBcxToDoGroupedObservable
        .toList()
        .flatMap( new Func1<List<BcxToDo>, Observable<BcxToDoList>>() {
          @Override
          public Observable<BcxToDoList> call(List<BcxToDo> bcxToDos) {
            bcxToDoListBcxToDoGroupedObservable.getKey().toDos.addAll( bcxToDos );

            return Observable.just( bcxToDoListBcxToDoGroupedObservable.getKey() );
          }
        });
    }
  })

  .toList()

  .toBlocking()
  .single();

不如第一个代码片段优雅,但至少它不再阻塞!如果我可以做些什么来使新代码片段更具可读性,我将不胜感激任何建议。

使用 RxJava 进行调试可能会很棘手。

您可以添加 .doOnNext() 调用,这样您就可以显示 RxJava 通知并查看发生了什么。

根据您的代码,我认为您的代码块在您使用 toList 运算符时只有在 Observable 完成时才会发出。

我认为您的流未完成,因此 toList 永远不会发出,然后您的代码将永远阻塞。

SO answer 提供了正确的调试方法 RxJava:

By now I can't reproduce the problem but I've found rxdebug-java a very good tool for debugging.

It's use is simple: add the library as a dependency and at application start register a listener:

RxJavaPlugins.getInstance().registerObservableExecutionHook(new DebugHook(new DebugNotificationListener() {
  public Object onNext(DebugNotification n) {
      Log.v(TAG,"onNext on "+n);
      return super.onNext(n);
  }


    public Object start(DebugNotification n) {
        Log.v(TAG,"start on "+n);
        return super.start(n);
    }


    public void complete(Object context) {
        super.complete(context);
        Log.v(TAG,"onNext on "+context);
    }

  public void error(Object context, Throwable e) {
      super.error(context, e);
      Log.e(TAG,"error on "+context);
  }
}));