Java 9 个反应流:一个订阅者是否属于一个发布者

Java 9 reactive flows: does one Subscriber belong to one Publisher

我想知道一个反应流发布者是否可以安全地假定订阅属于它并调用java.util.concurrent.Flow.Subscriber#onComplete,如果发布者将停止使用(例如被关闭)。下面的代码示例演示了困境(显然它只是一些合成代码来演示问题):

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TimePublisher implements Flow.Publisher<Long> {

    private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);

    private final ConcurrentLinkedQueue<Flow.Subscriber<? super Long>> subscribersList = new ConcurrentLinkedQueue<>();


    private TimePublisher() {

    }

    public static TimePublisher newInstance() {
        TimePublisher timePublisher = new TimePublisher();
        timePublisher.startTickScheduler();

        return timePublisher;
    }

    private void startTickScheduler() {
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            // does not make too much sense: just for the sake of the example
            final long currentTimeMillis = System.currentTimeMillis();
            subscribersList.forEach(sub -> sub.onNext(currentTimeMillis));
        }, 1, 1, TimeUnit.SECONDS);
    }

    @Override
    public void subscribe(Flow.Subscriber<? super Long> subscriber) {

        subscribersList.add(subscriber);

        subscriber.onSubscribe(new Flow.Subscription() {
            @Override
            public void request(long n) {
                // no-op in this sample
            }

            @Override
            public void cancel() {
                subscribersList.remove(subscriber);
            }
        });
    }

    public void stop() {
        // the publisher can be stopped from the outside: after that it will
        // definitely not emit any next items.
        scheduledExecutorService.shutdown();

        // QUESTION: can we assume that a Subscriber is subscribed to only this Publisher?
        // if it is subscribed to another publisher, the following is illegal, as onNext
        // could potentially be called by another Publisher...
        subscribersList.forEach(Flow.Subscriber::onComplete);

        subscribersList.clear();
    }
}

Subscriber 的文档说

The methods in this interface are invoked in strict sequential order for each Flow.Subscription.

onComplete 特别是:

Method invoked when it is known that no additional Subscriber method invocations will occur for a Subscription that is not already terminated by error, after which no other Subscriber methods are invoked by the Subscription. If this method throws an exception, resulting behavior is undefined.

因此其他 Subscription 继续调用方法是合法的。

Flow 文档说在 Subscriber 实现中有多个 Subscription 是可能的,但不推荐:

Because Subscriber method invocations for a given Flow.Subscription are strictly ordered, there is no need for these methods to use locks or volatiles unless a Subscriber maintains multiple Subscriptions (in which case it is better to instead define multiple Subscribers, each with its own Subscription).