如何使用 RxJava2 使用 IntentService 线程

How to use IntentService thread using RxJava2

我面临下一个情况。我在 onHadleIntent 方法中开始我的代码,部分代码在 IntentService 线程中工作,但 getInfoAboutUser() 中的 Observable.zip 方法在 RxJava 线程中工作。

@Override
protected void onHandleIntent(@Nullable Intent intent) {
      LOG.debug(Thread.currentThread().getName());

      Call<String> call= mRepository.getInfo();
        try {
            retrofit2.Response<String> response = call.execute();
            if (response.isSuccessful()) {
                LOG.debug("Response body "+Thread.currentThread().getName());
                getInfoAboutUser();
            }
       }catch(){}
}

public void getInfoAboutUser(){
       LOG.debug("getInfoAboutUser "+Thread.currentThread().getName());

       Executor e = new Executor() {
            @Override
            public void execute(@NonNull Runnable runnable) {
                LOG.debug(" executor thread");
                runnable.run();
            }
        }; 

 Observable.zip(
                Observable.fromIterable(array),
                Observable
                        .interval((mRandom.nextInt(7)+5) * 1000, 
 TimeUnit.MILLISECONDS,Schedulers.from(e))
                        .take(array.size()),
                new BiFunction<String, Long, String>() {
                    @Override
                    public String apply(String s, Long aLong) throws Exception {
                        LOG.debug("Result "+Thread.currentThread().getName());
                        return s;
                    }
                }
       ).flatMapMaybe(new Function<String, MaybeSource<String>>() {
            @Override
            public MaybeSource<String> apply(String s) throws Exception {
                return mRepository.getInfoAboutUser(s); 
            }
        }).subscribeWith(new DisposableObserver<String>() {})
}
我使用的

mRepository.getInfo() 和 mRepository.getInfoAboutUser(s) 方法没有 subscribeOn 和 observeOn!

我的日志是:

等等

如何将 IntentService 线程用于 Observable.zip 和 Interval 方法?我只需要 IntentService 线程

Schedulers.from(e)Executor 包裹在内部 class ExecutorScheduler 中。如果一个任务被延迟安排并且给定的 Executor 不是 ScheduledExecutorService 那么它将使用一个内部调度程序来延迟对 e.execute() 的调用直到延迟完成。

因为你的执行器只是立即执行,所以它最终在 RxSingleScheduler-1.

的辅助调度器上执行

要解决此问题,您需要从以下解决方案中进行选择:

  1. 创建一个 ScheduledExecutorService 将可运行程序正确分派到 IntentService
  2. 创建自定义 Scheduler 将可运行对象分派给 IntentServiceLooper
  3. 使用 RxAndroid 库为您完成 2.。 AndroidSchedulers.from(Looper.myLooper()) 将为 IntentService.
  4. 创建一个调度程序

编辑:请注意,IntentService 和异步操作不能混用。该服务将在 handleIntent returns 时终止,所以这不是很好 执行像 Observable.interval 这样的延迟操作的方法。