RxJava - 为什么执行者只使用一个线程

RxJava - Why the executors only use one thread

我已经创建了一个固定的线程池来处理每 300 毫秒发出的事件并假设该过程需要 1000 毫秒。假设多线程可以工作,但只有一个线程被重用。

如果我设置的sleepTime小于300ms,处理线程会改变,但那是没有用的。

问题:我该怎么做才能使它并发?为什么程序重用线程?

提前致谢

public static void main(String[] args) throws InterruptedException {
    long sleepTime = 1000;
    ExecutorService e = Executors.newFixedThreadPool(3);

    Observable.interval(300, TimeUnit.MILLISECONDS)
    .subscribeOn(Schedulers.computation())
    .flatMap(new Func1<Long, Observable<Long>>() {
        @Override
        public Observable<Long> call(Long pT) {
            return Observable.just(pT).subscribeOn(Schedulers.from(e));
        }
    })
    .doOnNext(new Action1<Long>() {

        @Override
        public void call(Long pT) {
            try {
                Thread.sleep(sleepTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    })
    .subscribe(new Action1<Long>() {

        @Override
        public void call(Long pT) {
            System.out.println("i am " + pT + "in thread:" + Thread.currentThread().getName());

        }
    });


    Thread.sleep(50000);
    e.shutdownNow();

}

日志

i am 0in thread:pool-1-thread-1
i am 1in thread:pool-1-thread-1
i am 2in thread:pool-1-thread-1
i am 3in thread:pool-1-thread-1
i am 4in thread:pool-1-thread-1
i am 5in thread:pool-1-thread-1
i am 6in thread:pool-1-thread-1
i am 7in thread:pool-1-thread-1
i am 8in thread:pool-1-thread-1
i am 9in thread:pool-1-thread-1
i am 10in thread:pool-1-thread-1
i am 11in thread:pool-1-thread-1

根据我对您的代码的理解,生产者的生产速度比订阅者快。然而 Observable<Long> interval(long interval, TimeUnit unit) 实际上不支持 Backpressure。文档指出

This operator does not support backpressure as it uses time. If the downstream needs a slower it should slow the timer or use something like {@link #onBackpressureDrop}.

如果你的处理速度真的比生产者慢,你可以在你的订阅者代码中做这样的事情

.subscribe(new Action1<Long>() {

    @Override
    public void call(Long pT) {
        e.submit(new Runnable() {
            System.out.println("i am " + pT + "in thread:" + Thread.currentThread().getName());

        }
    }
});

相反

 .subscribeOn(Schedulers.computation())

尝试

 .observeOn(Schedulers.computation())

我之前用 Rx 玩并发的这个例子作为例子非常好

   public class ObservableZip {

private Scheduler scheduler;
private Scheduler scheduler1;
private Scheduler scheduler2;

@Test
public void testAsyncZip() {
    scheduler = Schedulers.newThread();
    scheduler1 = Schedulers.newThread();
    scheduler2 = Schedulers.newThread();
    long start = System.currentTimeMillis();
    Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2)
                                                                                        .concat(s3))
              .subscribe(result -> showResult("Async:", start, result));
}




public void showResult(String transactionType, long start, String result) {
    System.out.println(result + " " +
                               transactionType + String.valueOf(System.currentTimeMillis() - start));
}


public Observable<String> obAsyncString() {
    return Observable.just("")
                     .observeOn(scheduler)
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> "Hello");
}

public Observable<String> obAsyncString1() {
    return Observable.just("")
                     .observeOn(scheduler1)
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> " World");
}

public Observable<String> obAsyncString2() {
    return Observable.just("")
                     .observeOn(scheduler2)
                     .doOnNext(val -> {
                         System.out.println("Thread " +  Thread.currentThread()
                                                               .getName());
                     })
                     .map(val -> "!");
  }

 }

我在 GitHub 中找到了答案!

内部 observable 确实在 multi-thread 上发射,但接下来的后续行动却没有。如果我想要它并行,我应该在内部 observable 中进行。