Java 9 Flow SubmissionPublisher 提供方法的行为

Java 9 Behavior of Flow SubmissionPublisher offer method

我一直在玩 Java Flow offer 运算符,但在阅读文档并进行测试后我不明白。

这是我的测试

@Test
public void offer() throws InterruptedException {
    //Create Publisher for expected items Strings
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    //Register Subscriber
    publisher.subscribe(new CustomSubscriber<>());
    publisher.subscribe(new CustomSubscriber<>());
    publisher.subscribe(new CustomSubscriber<>());
    publisher.offer("item", (subscriber, value) -> false);
    Thread.sleep(500);
}

offer 运算符接收要发出的项目和 BiPredicate 函数,据我阅读文档的理解,只有在谓词函数为真时才会发出项目。

Bur 通过测试后的结果是

Subscription done:
Subscription done:
Subscription done:
Got : item --> onNext() callback
Got : item --> onNext() callback
Got : item --> onNext() callback

如果 I return true 而不是 false,结果没有变化。

任何人都可以向我解释一下这个运算符。

不,谓词函数用于决定是否重试 docs中提到的发布操作:

onDrop - if non-null, the handler invoked upon a drop to a subscriber, with arguments of the subscriber and item; if it returns true, an offer is re-attempted (once)

不影响是否最初发送该项目。

编辑:使用 offer 方法时如何发生掉落的示例

我想出了一个示例,说明调用 offer 方法时如何发生掉落。我不认为输出是 100% 确定性的,但是当它是 运行 几次时有明显的区别。您可以将处理程序更改为 return true 而不是 false,以查看重试如何减少由于缓冲区饱和而导致的丢弃。在此示例中,丢弃通常会发生,因为最大缓冲区容量明显很小(传递给 SubmissionPublisher 的构造函数)。但是当在一个小的休眠期后启用重试时,掉落被移除:

public class SubmissionPubliserDropTest {

    public static void main(String[] args) throws InterruptedException {
        // Create Publisher for expected items Strings
        // Note the small buffer max capacity to be able to cause drops
        SubmissionPublisher<String> publisher =
                               new SubmissionPublisher<>(ForkJoinPool.commonPool(), 2);
        // Register Subscriber
        publisher.subscribe(new CustomSubscriber<>());
        publisher.subscribe(new CustomSubscriber<>());
        publisher.subscribe(new CustomSubscriber<>());
        // publish 3 items for each subscriber
        for(int i = 0; i < 3; i++) {
            int result = publisher.offer("item" + i, (subscriber, value) -> {
                // sleep for a small period before deciding whether to retry or not
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return false;  // you can switch to true to see that drops are reduced
            });
            // show the number of dropped items
            if(result < 0) {
                System.err.println("dropped: " + result);
            }
        }
        Thread.sleep(3000);
        publisher.close();
    }
}

class CustomSubscriber<T> implements Flow.Subscriber<T> {

    private Subscription sub;

    @Override
    public void onComplete() {
        System.out.println("onComplete");
    }

    @Override
    public void onError(Throwable th) {
        th.printStackTrace();
        sub.cancel();
    }

    @Override
    public void onNext(T arg0) {
        System.out.println("Got : " + arg0 + " --> onNext() callback");
        sub.request(1);
    }

    @Override
    public void onSubscribe(Subscription sub) {
        System.out.println("Subscription done");
        this.sub = sub;
        sub.request(1);
    }

}

SubmissionPublisher.offer 表示

The item may be dropped by one or more subscribers if resource limits are exceeded, in which case the given handler (if non-null) is invoked, and if it returns true, retried once.

只是为了理解,在你的两个电话中

publisher.offer("item", (subscriber, value) -> true); // the handler would be invoked

publisher.offer("item", (subscriber, value) -> false); // the handler wouldn't be invoked

publisher 仍然会向其当前的每个订阅者发布给定的项目。 这发生在您当前的场景。


就资源限制而言,通过尝试重现来验证您提供的处理程序是否被调用的场景很困难,正如文档所建议的:

The item may be dropped by one or more subscribers if resource limits are exceeded, in which case the given handler (if non-null) is invoked, and if it returns true, retried once.

但是您可以尝试删除超时设置为基本最小值的项目,使用 offer​(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)

的重载方法

timeout - how long to wait for resources for any subscriber before giving up, in units of unit

unit - a TimeUnit determining how to interpret the timeout parameter

由于 offer 方法可能会丢弃项目(立即或使用 有限超时 ),这将提供一个机会插入处理程序,然后重试。