运行 主线程中的流程
Run Flow in main thread
我再次比较 RxJava 和 Java 9 Flow。我看到默认情况下 Flow 是异步的,我想知道是否有办法使其 运行 同步。
有时我们只是想将它用于 sugar 语法而不是用于 Nio,并拥有更同质的代码。
在 RxJava 中,默认情况下它是同步的,您可以在管道中使用 observerOn
和 subscribeOn
使其 运行 异步。
Flow 中有任何运算符可以在主线程中使其 运行 吗?
此致。
您可以定义自定义 Publisher
,如 Flow
中所述,以使用同步执行。
A very simple publisher that only issues (when requested) a single
TRUE item to a single subscriber. Because the subscriber receives only
a single item, this class does not use buffering and ordering control.
class OneShotPublisher implements Publisher<Boolean> {
private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
private boolean subscribed; // true after first subscribe
public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
if (subscribed)
subscriber.onError(new IllegalStateException()); // only one allowed
else {
subscribed = true;
subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
}
}
static class OneShotSubscription implements Subscription {
private final Subscriber<? super Boolean> subscriber;
private final ExecutorService executor;
private Future<?> future; // to allow cancellation
private boolean completed;
OneShotSubscription(Subscriber<? super Boolean> subscriber,
ExecutorService executor) {
this.subscriber = subscriber;
this.executor = executor;
}
public synchronized void request(long n) {
if (n != 0 && !completed) {
completed = true;
if (n < 0) {
IllegalArgumentException ex = new IllegalArgumentException();
executor.execute(() -> subscriber.onError(ex));
} else {
future = executor.submit(() -> {
subscriber.onNext(Boolean.TRUE);
subscriber.onComplete();
});
}
}
}
public synchronized void cancel() {
completed = true;
if (future != null) future.cancel(false);
}
}
}
没有操作员可以这样做,但 API 允许您控制项目的发布方式。因此,您可以直接从当前线程调用订阅者方法。
class SynchronousPublisher implements Publisher<Data> {
public synchronized void subscribe(Subscriber<? super Data> subscriber) {
subscriber.onSubscribe(new SynchronousSubscription(subscriber));
}
}
static class SynchronousSubscription implements Subscription {
private final Subscriber<? super Data> subscriber;
SynchronousSubscription(Subscriber<? super Data> subscriber) {
this.subscriber = subscriber;
}
public synchronized void request(long n) {
... // prepare item
subscriber.onNext(someItem);
}
...
}
}
这取决于你在主线程上运行的意思。
如果您想强制任意 Flow 在特定线程上执行,没有标准的方法可以执行此操作,除非 Flow 在库中实现,让您覆盖提供异步的部分。在 RxJava 术语中,这些是由 Schedulers
实用程序 class.
提供的 Scheduler
如果要在主线程上观察 Flow,则必须在 Flow.Subscriber
之上编写一个阻塞队列使用者,它会阻塞线程,直到队列中有项目为止。这可能会变得很复杂,因此我将向您介绍 Reactive4JavaFlow 中的 blockingSubscribe
实现。
如果你想将Java主线程用作Executor
/Scheduler
,那就更复杂了,需要类似的阻塞机制以及线程池执行器的一些想法. Reactive4JavaFlow恰好有这样一个Scheduler,你可以通过以下方式将其用作Executor:new SubmissionPublisher<>(128, blockingScheduler::schedule)
.
我再次比较 RxJava 和 Java 9 Flow。我看到默认情况下 Flow 是异步的,我想知道是否有办法使其 运行 同步。
有时我们只是想将它用于 sugar 语法而不是用于 Nio,并拥有更同质的代码。
在 RxJava 中,默认情况下它是同步的,您可以在管道中使用 observerOn
和 subscribeOn
使其 运行 异步。
Flow 中有任何运算符可以在主线程中使其 运行 吗?
此致。
您可以定义自定义 Publisher
,如 Flow
中所述,以使用同步执行。
A very simple publisher that only issues (when requested) a single TRUE item to a single subscriber. Because the subscriber receives only a single item, this class does not use buffering and ordering control.
class OneShotPublisher implements Publisher<Boolean> {
private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
private boolean subscribed; // true after first subscribe
public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
if (subscribed)
subscriber.onError(new IllegalStateException()); // only one allowed
else {
subscribed = true;
subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
}
}
static class OneShotSubscription implements Subscription {
private final Subscriber<? super Boolean> subscriber;
private final ExecutorService executor;
private Future<?> future; // to allow cancellation
private boolean completed;
OneShotSubscription(Subscriber<? super Boolean> subscriber,
ExecutorService executor) {
this.subscriber = subscriber;
this.executor = executor;
}
public synchronized void request(long n) {
if (n != 0 && !completed) {
completed = true;
if (n < 0) {
IllegalArgumentException ex = new IllegalArgumentException();
executor.execute(() -> subscriber.onError(ex));
} else {
future = executor.submit(() -> {
subscriber.onNext(Boolean.TRUE);
subscriber.onComplete();
});
}
}
}
public synchronized void cancel() {
completed = true;
if (future != null) future.cancel(false);
}
}
}
没有操作员可以这样做,但 API 允许您控制项目的发布方式。因此,您可以直接从当前线程调用订阅者方法。
class SynchronousPublisher implements Publisher<Data> {
public synchronized void subscribe(Subscriber<? super Data> subscriber) {
subscriber.onSubscribe(new SynchronousSubscription(subscriber));
}
}
static class SynchronousSubscription implements Subscription {
private final Subscriber<? super Data> subscriber;
SynchronousSubscription(Subscriber<? super Data> subscriber) {
this.subscriber = subscriber;
}
public synchronized void request(long n) {
... // prepare item
subscriber.onNext(someItem);
}
...
}
}
这取决于你在主线程上运行的意思。
如果您想强制任意 Flow 在特定线程上执行,没有标准的方法可以执行此操作,除非 Flow 在库中实现,让您覆盖提供异步的部分。在 RxJava 术语中,这些是由 Schedulers
实用程序 class.
Scheduler
如果要在主线程上观察 Flow,则必须在 Flow.Subscriber
之上编写一个阻塞队列使用者,它会阻塞线程,直到队列中有项目为止。这可能会变得很复杂,因此我将向您介绍 Reactive4JavaFlow 中的 blockingSubscribe
实现。
如果你想将Java主线程用作Executor
/Scheduler
,那就更复杂了,需要类似的阻塞机制以及线程池执行器的一些想法. Reactive4JavaFlow恰好有这样一个Scheduler,你可以通过以下方式将其用作Executor:new SubmissionPublisher<>(128, blockingScheduler::schedule)
.