RxJava中发值代码和收值代码的线程执行
Thread execution of value emitting code and value receiving code in RxJava
我有以下代码:
private static void log(Object msg) {
System.out.println(
Thread.currentThread().getName() +
": " + msg);
}
Observable<Integer> naturalNumbers = Observable.create(emitter -> {
log("Invoked"); // on main thread
Runnable r = () -> {
log("Invoked on another thread");
int i = 0;
while(!emitter.isDisposed()) {
log("Emitting "+ i);
emitter.onNext(i);
i += 1;
}
};
new Thread(r).start();
});
Disposable disposable = naturalNumbers.subscribe(i -> log("Received "+i));
所以这里我们有 2 个重要的 lambda 表达式。第一个是我们传递给 Observable.create 的那个,第二个是我们传递给 Observable.subscribe() 的回调函数。在第一个 lambda 中,我们创建一个新线程,然后在该线程上发出值。在第二个 lambda 中,我们有代码来接收在第一个 lambda 代码中发出的那些值。我观察到这两个代码都在同一个线程上执行。
Thread-0: Invoked on another thread
Thread-0: Emitting 0
Thread-0: Received 0
Thread-0: Emitting 1
Thread-0: Received 1
Thread-0: Emitting 2
Thread-0: Received 2
为什么会这样?默认情况下,RxJava 是否 运行 代码在同一线程上发出值(可观察)和代码接收值(观察者)?
让我们看看,如果您使用 Thread
执行 运行nable 会发生什么:
测试
@Test
void threadTest() throws Exception {
log("main");
CountDownLatch countDownLatch = new CountDownLatch(1);
new Thread(
() -> {
log("thread");
countDownLatch.countDown();
})
.start();
countDownLatch.await();
}
输出
main: main
Thread-0: thread
似乎主入口点是从 main
线程调用的,新创建的 Thread
被调用 Thread-0
。
Why is it so? Does RxJava by default run code emitting values(observable) and the code receiving values(observer) on same thread?
默认情况下 RxJava
是单线程的。因此,如果生产者没有通过 observeOn
、subscribeOn
或不同的线程布局进行不同定义,将在 consumer
(订阅者)线程上发出值。这是因为 RxJava
运行 默认情况下订阅堆栈上的所有内容。
示例 2
@Test
void fdskfkjsj() throws Exception {
log("main");
Observable<Integer> naturalNumbers =
Observable.create(
emitter -> {
log("Invoked"); // on main thread
Runnable r =
() -> {
log("Invoked on another thread");
int i = 0;
while (!emitter.isDisposed()) {
log("Emitting " + i);
emitter.onNext(i);
i += 1;
}
};
new Thread(r).start();
});
Disposable disposable = naturalNumbers.subscribe(i -> log("Received " + i));
Thread.sleep(100);
}
输出2
main: main
main: Invoked
Thread-0: Invoked on another thread
Thread-0: Emitting 0
Thread-0: Received 0
Thread-0: Emitting 1
在您的示例中,很明显,main 方法是从主线程调用的。此外,subscribeActual
调用在调用线程 (main
) 上也是 运行。但是 Observable#create
lambda 从新创建的线程 Thread-0
调用 onNext
。该值从调用线程推送到订阅者。在这种情况下,调用线程是 Thread-0
,因为它在下游订阅者上调用 onNext
。
如何区分生产者和消费者?
使用 observeOn
/subscribeOn
运算符来处理 RxJava
.
中的并发
我应该使用 RxJava 的低级 Thread 构造吗?
不,您不应该使用 new Thread
来将生产者与消费者分开。违约很容易,onNext
不能并发调用(交错),因此违约。这就是为什么 RxJava
提供了一个名为 Scheduler
的构造和 Worker
以减少此类错误的原因。
注意:
我认为这篇文章描述得很好:http://introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html . Please note this is Rx.NET, but the principle is quite the same. If you want to read about concurrency with RxJava
you could also look into Davids Blog (https://akarnokd.blogspot.com/2015/05/schedulers-part-1.html) or read this Book (Reactive Programming with RxJava https://www.oreilly.com/library/view/reactive-programming-with/9781491931646/)
我有以下代码:
private static void log(Object msg) {
System.out.println(
Thread.currentThread().getName() +
": " + msg);
}
Observable<Integer> naturalNumbers = Observable.create(emitter -> {
log("Invoked"); // on main thread
Runnable r = () -> {
log("Invoked on another thread");
int i = 0;
while(!emitter.isDisposed()) {
log("Emitting "+ i);
emitter.onNext(i);
i += 1;
}
};
new Thread(r).start();
});
Disposable disposable = naturalNumbers.subscribe(i -> log("Received "+i));
所以这里我们有 2 个重要的 lambda 表达式。第一个是我们传递给 Observable.create 的那个,第二个是我们传递给 Observable.subscribe() 的回调函数。在第一个 lambda 中,我们创建一个新线程,然后在该线程上发出值。在第二个 lambda 中,我们有代码来接收在第一个 lambda 代码中发出的那些值。我观察到这两个代码都在同一个线程上执行。
Thread-0: Invoked on another thread
Thread-0: Emitting 0
Thread-0: Received 0
Thread-0: Emitting 1
Thread-0: Received 1
Thread-0: Emitting 2
Thread-0: Received 2
为什么会这样?默认情况下,RxJava 是否 运行 代码在同一线程上发出值(可观察)和代码接收值(观察者)?
让我们看看,如果您使用 Thread
执行 运行nable 会发生什么:
测试
@Test
void threadTest() throws Exception {
log("main");
CountDownLatch countDownLatch = new CountDownLatch(1);
new Thread(
() -> {
log("thread");
countDownLatch.countDown();
})
.start();
countDownLatch.await();
}
输出
main: main
Thread-0: thread
似乎主入口点是从 main
线程调用的,新创建的 Thread
被调用 Thread-0
。
Why is it so? Does RxJava by default run code emitting values(observable) and the code receiving values(observer) on same thread?
默认情况下 RxJava
是单线程的。因此,如果生产者没有通过 observeOn
、subscribeOn
或不同的线程布局进行不同定义,将在 consumer
(订阅者)线程上发出值。这是因为 RxJava
运行 默认情况下订阅堆栈上的所有内容。
示例 2
@Test
void fdskfkjsj() throws Exception {
log("main");
Observable<Integer> naturalNumbers =
Observable.create(
emitter -> {
log("Invoked"); // on main thread
Runnable r =
() -> {
log("Invoked on another thread");
int i = 0;
while (!emitter.isDisposed()) {
log("Emitting " + i);
emitter.onNext(i);
i += 1;
}
};
new Thread(r).start();
});
Disposable disposable = naturalNumbers.subscribe(i -> log("Received " + i));
Thread.sleep(100);
}
输出2
main: main
main: Invoked
Thread-0: Invoked on another thread
Thread-0: Emitting 0
Thread-0: Received 0
Thread-0: Emitting 1
在您的示例中,很明显,main 方法是从主线程调用的。此外,subscribeActual
调用在调用线程 (main
) 上也是 运行。但是 Observable#create
lambda 从新创建的线程 Thread-0
调用 onNext
。该值从调用线程推送到订阅者。在这种情况下,调用线程是 Thread-0
,因为它在下游订阅者上调用 onNext
。
如何区分生产者和消费者?
使用 observeOn
/subscribeOn
运算符来处理 RxJava
.
我应该使用 RxJava 的低级 Thread 构造吗?
不,您不应该使用 new Thread
来将生产者与消费者分开。违约很容易,onNext
不能并发调用(交错),因此违约。这就是为什么 RxJava
提供了一个名为 Scheduler
的构造和 Worker
以减少此类错误的原因。
注意:
我认为这篇文章描述得很好:http://introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html . Please note this is Rx.NET, but the principle is quite the same. If you want to read about concurrency with RxJava
you could also look into Davids Blog (https://akarnokd.blogspot.com/2015/05/schedulers-part-1.html) or read this Book (Reactive Programming with RxJava https://www.oreilly.com/library/view/reactive-programming-with/9781491931646/)