Java 9 个反应流:一个订阅者是否属于一个发布者
Java 9 reactive flows: does one Subscriber belong to one Publisher
我想知道一个反应流发布者是否可以安全地假定订阅属于它仅并调用java.util.concurrent.Flow.Subscriber#onComplete
,如果发布者将停止使用(例如被关闭)。下面的代码示例演示了困境(显然它只是一些合成代码来演示问题):
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TimePublisher implements Flow.Publisher<Long> {
private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
private final ConcurrentLinkedQueue<Flow.Subscriber<? super Long>> subscribersList = new ConcurrentLinkedQueue<>();
private TimePublisher() {
}
public static TimePublisher newInstance() {
TimePublisher timePublisher = new TimePublisher();
timePublisher.startTickScheduler();
return timePublisher;
}
private void startTickScheduler() {
scheduledExecutorService.scheduleAtFixedRate(() -> {
// does not make too much sense: just for the sake of the example
final long currentTimeMillis = System.currentTimeMillis();
subscribersList.forEach(sub -> sub.onNext(currentTimeMillis));
}, 1, 1, TimeUnit.SECONDS);
}
@Override
public void subscribe(Flow.Subscriber<? super Long> subscriber) {
subscribersList.add(subscriber);
subscriber.onSubscribe(new Flow.Subscription() {
@Override
public void request(long n) {
// no-op in this sample
}
@Override
public void cancel() {
subscribersList.remove(subscriber);
}
});
}
public void stop() {
// the publisher can be stopped from the outside: after that it will
// definitely not emit any next items.
scheduledExecutorService.shutdown();
// QUESTION: can we assume that a Subscriber is subscribed to only this Publisher?
// if it is subscribed to another publisher, the following is illegal, as onNext
// could potentially be called by another Publisher...
subscribersList.forEach(Flow.Subscriber::onComplete);
subscribersList.clear();
}
}
- 当
TimePublisher#stop
被调用时,这个特定的 Publisher 绝对不会发出任何 onNext
调用,因此调用 onComplete
似乎是一个合理的选择
- 但是,如果订阅者也订阅了另一个发布者,那么调用
onComplete
可能是非法的,因为另一个发布者可能仍在发射项目。
Subscriber
的文档说
The methods in this interface are invoked in strict sequential order for each Flow.Subscription.
onComplete
特别是:
Method invoked when it is known that no additional Subscriber method invocations will occur for a Subscription that is not already terminated by error, after which no other Subscriber methods are invoked by the Subscription. If this method throws an exception, resulting behavior is undefined.
因此其他 Subscription
继续调用方法是合法的。
Flow
文档说在 Subscriber
实现中有多个 Subscription
是可能的,但不推荐:
Because Subscriber method invocations for a given Flow.Subscription are strictly ordered, there is no need for these methods to use locks or volatiles unless a Subscriber maintains multiple Subscriptions (in which case it is better to instead define multiple Subscribers, each with its own Subscription).
我想知道一个反应流发布者是否可以安全地假定订阅属于它仅并调用java.util.concurrent.Flow.Subscriber#onComplete
,如果发布者将停止使用(例如被关闭)。下面的代码示例演示了困境(显然它只是一些合成代码来演示问题):
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TimePublisher implements Flow.Publisher<Long> {
private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
private final ConcurrentLinkedQueue<Flow.Subscriber<? super Long>> subscribersList = new ConcurrentLinkedQueue<>();
private TimePublisher() {
}
public static TimePublisher newInstance() {
TimePublisher timePublisher = new TimePublisher();
timePublisher.startTickScheduler();
return timePublisher;
}
private void startTickScheduler() {
scheduledExecutorService.scheduleAtFixedRate(() -> {
// does not make too much sense: just for the sake of the example
final long currentTimeMillis = System.currentTimeMillis();
subscribersList.forEach(sub -> sub.onNext(currentTimeMillis));
}, 1, 1, TimeUnit.SECONDS);
}
@Override
public void subscribe(Flow.Subscriber<? super Long> subscriber) {
subscribersList.add(subscriber);
subscriber.onSubscribe(new Flow.Subscription() {
@Override
public void request(long n) {
// no-op in this sample
}
@Override
public void cancel() {
subscribersList.remove(subscriber);
}
});
}
public void stop() {
// the publisher can be stopped from the outside: after that it will
// definitely not emit any next items.
scheduledExecutorService.shutdown();
// QUESTION: can we assume that a Subscriber is subscribed to only this Publisher?
// if it is subscribed to another publisher, the following is illegal, as onNext
// could potentially be called by another Publisher...
subscribersList.forEach(Flow.Subscriber::onComplete);
subscribersList.clear();
}
}
- 当
TimePublisher#stop
被调用时,这个特定的 Publisher 绝对不会发出任何onNext
调用,因此调用onComplete
似乎是一个合理的选择 - 但是,如果订阅者也订阅了另一个发布者,那么调用
onComplete
可能是非法的,因为另一个发布者可能仍在发射项目。
Subscriber
的文档说
The methods in this interface are invoked in strict sequential order for each Flow.Subscription.
onComplete
特别是:
Method invoked when it is known that no additional Subscriber method invocations will occur for a Subscription that is not already terminated by error, after which no other Subscriber methods are invoked by the Subscription. If this method throws an exception, resulting behavior is undefined.
因此其他 Subscription
继续调用方法是合法的。
Flow
文档说在 Subscriber
实现中有多个 Subscription
是可能的,但不推荐:
Because Subscriber method invocations for a given Flow.Subscription are strictly ordered, there is no need for these methods to use locks or volatiles unless a Subscriber maintains multiple Subscriptions (in which case it is better to instead define multiple Subscribers, each with its own Subscription).