查询 RxJava observeOn 调度线程
Query on RxJava observeOn scheduler thread
我必须根据传入的请求写入文件。由于多个请求可能同时到来,我不希望多个线程试图一起覆盖文件内容,这可能会导致丢失一些数据。
因此,我尝试使用 PublishSubject
的实例变量收集所有请求的数据。我在初始化期间订阅了 publishSubject
并且此订阅将在应用程序的整个生命周期中保持不变。我也在一个单独的线程(由 Vertx 事件循环提供)上观察同一个实例,它调用负责写入文件的方法。
private PublishSubject<FileData> publishSubject = PublishSubject.create();
private void init() {
publishSubject.observeOn(RxHelper.blockingScheduler(vertx)).subscribe(fileData -> writeData(fileData));
}
稍后在请求处理期间,我调用 onNext
如下:
handleRequest() {
//do some task
publishSubject.onNext(fileData);
}
我了解到,当我调用 onNext
时,数据将排队,由 observeOn
运算符分配的特定线程写入文件。但是,我想了解的是
- 此线程是否仅为此阻塞在 WAITING 状态
任务?或者,
- 没有文件时是否也用于其他活动
写作发生?
我不想因为使用这种方法而导致 vertx 事件循环中的一个线程浪费在等待状态。另外,如果有的话,请提出更好的方法。
提前致谢。
实际上 RxJava 会为你做这件事,根据定义 onNext()
排放将以串行方式运行:
Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications. (Observable Contract)
因此,只要您 运行 阻止订阅者 onNext()
内的调用(并且不会手动将工作分叉到不同的线程)就可以了,并且不会有并行写入会发生。
其实,你的担心应该来自相反的方向——背压。
你应该在这里选择你的背压策略,就好像请求来得更快然后你处理它们(写入文件)你可能会溢出缓冲区并陷入麻烦。 (考虑使用 Flowable 并根据您的需要选择您的背压策略。
关于您的问题,这取决于调度程序,您使用的 RxHelper.blockingScheduler(vertx)
似乎是您的自定义代码,所以我无法判断调度程序是否以工作队列方式使用共享线程它不会闲着。
无论如何,Rx 不会为你决定这个,调度器的职责是根据它的逻辑将工作分配给某个线程。
我必须根据传入的请求写入文件。由于多个请求可能同时到来,我不希望多个线程试图一起覆盖文件内容,这可能会导致丢失一些数据。
因此,我尝试使用 PublishSubject
的实例变量收集所有请求的数据。我在初始化期间订阅了 publishSubject
并且此订阅将在应用程序的整个生命周期中保持不变。我也在一个单独的线程(由 Vertx 事件循环提供)上观察同一个实例,它调用负责写入文件的方法。
private PublishSubject<FileData> publishSubject = PublishSubject.create();
private void init() {
publishSubject.observeOn(RxHelper.blockingScheduler(vertx)).subscribe(fileData -> writeData(fileData));
}
稍后在请求处理期间,我调用 onNext
如下:
handleRequest() {
//do some task
publishSubject.onNext(fileData);
}
我了解到,当我调用 onNext
时,数据将排队,由 observeOn
运算符分配的特定线程写入文件。但是,我想了解的是
- 此线程是否仅为此阻塞在 WAITING 状态 任务?或者,
- 没有文件时是否也用于其他活动 写作发生? 我不想因为使用这种方法而导致 vertx 事件循环中的一个线程浪费在等待状态。另外,如果有的话,请提出更好的方法。
提前致谢。
实际上 RxJava 会为你做这件事,根据定义 onNext()
排放将以串行方式运行:
Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications. (Observable Contract)
因此,只要您 运行 阻止订阅者 onNext()
内的调用(并且不会手动将工作分叉到不同的线程)就可以了,并且不会有并行写入会发生。
其实,你的担心应该来自相反的方向——背压。
你应该在这里选择你的背压策略,就好像请求来得更快然后你处理它们(写入文件)你可能会溢出缓冲区并陷入麻烦。 (考虑使用 Flowable 并根据您的需要选择您的背压策略。
关于您的问题,这取决于调度程序,您使用的 RxHelper.blockingScheduler(vertx)
似乎是您的自定义代码,所以我无法判断调度程序是否以工作队列方式使用共享线程它不会闲着。
无论如何,Rx 不会为你决定这个,调度器的职责是根据它的逻辑将工作分配给某个线程。