Java 9 Flow SubmissionPublisher 提供方法的行为
Java 9 Behavior of Flow SubmissionPublisher offer method
我一直在玩 Java Flow offer
运算符,但在阅读文档并进行测试后我不明白。
这是我的测试
@Test
public void offer() throws InterruptedException {
//Create Publisher for expected items Strings
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
//Register Subscriber
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
publisher.offer("item", (subscriber, value) -> false);
Thread.sleep(500);
}
offer 运算符接收要发出的项目和 BiPredicate 函数,据我阅读文档的理解,只有在谓词函数为真时才会发出项目。
Bur 通过测试后的结果是
Subscription done:
Subscription done:
Subscription done:
Got : item --> onNext() callback
Got : item --> onNext() callback
Got : item --> onNext() callback
如果 I return true 而不是 false,结果没有变化。
任何人都可以向我解释一下这个运算符。
不,谓词函数用于决定是否重试 docs中提到的发布操作:
onDrop
- if non-null, the handler invoked upon a drop to a subscriber, with arguments of the subscriber and item; if it returns true, an offer is re-attempted (once)
不影响是否最初发送该项目。
编辑:使用 offer
方法时如何发生掉落的示例
我想出了一个示例,说明调用 offer
方法时如何发生掉落。我不认为输出是 100% 确定性的,但是当它是 运行 几次时有明显的区别。您可以将处理程序更改为 return true 而不是 false,以查看重试如何减少由于缓冲区饱和而导致的丢弃。在此示例中,丢弃通常会发生,因为最大缓冲区容量明显很小(传递给 SubmissionPublisher
的构造函数)。但是当在一个小的休眠期后启用重试时,掉落被移除:
public class SubmissionPubliserDropTest {
public static void main(String[] args) throws InterruptedException {
// Create Publisher for expected items Strings
// Note the small buffer max capacity to be able to cause drops
SubmissionPublisher<String> publisher =
new SubmissionPublisher<>(ForkJoinPool.commonPool(), 2);
// Register Subscriber
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
// publish 3 items for each subscriber
for(int i = 0; i < 3; i++) {
int result = publisher.offer("item" + i, (subscriber, value) -> {
// sleep for a small period before deciding whether to retry or not
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return false; // you can switch to true to see that drops are reduced
});
// show the number of dropped items
if(result < 0) {
System.err.println("dropped: " + result);
}
}
Thread.sleep(3000);
publisher.close();
}
}
class CustomSubscriber<T> implements Flow.Subscriber<T> {
private Subscription sub;
@Override
public void onComplete() {
System.out.println("onComplete");
}
@Override
public void onError(Throwable th) {
th.printStackTrace();
sub.cancel();
}
@Override
public void onNext(T arg0) {
System.out.println("Got : " + arg0 + " --> onNext() callback");
sub.request(1);
}
@Override
public void onSubscribe(Subscription sub) {
System.out.println("Subscription done");
this.sub = sub;
sub.request(1);
}
}
The item may be dropped by one or more subscribers if resource limits
are exceeded, in which case the given handler (if non-null) is
invoked, and if it returns true
, retried once.
只是为了理解,在你的两个电话中
publisher.offer("item", (subscriber, value) -> true); // the handler would be invoked
publisher.offer("item", (subscriber, value) -> false); // the handler wouldn't be invoked
但 publisher
仍然会向其当前的每个订阅者发布给定的项目。 这发生在您当前的场景。
就资源限制而言,通过尝试重现来验证您提供的处理程序是否被调用的场景很困难,正如文档所建议的:
The item may be dropped by one or more subscribers if resource limits
are exceeded, in which case the given handler (if non-null) is
invoked, and if it returns true, retried once.
但是您可以尝试删除超时设置为基本最小值的项目,使用
offer(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
的重载方法
timeout
- how long to wait for resources for any subscriber before
giving up, in units of unit
unit
- a TimeUnit determining how to
interpret the timeout parameter
由于 offer
方法可能会丢弃项目(立即或使用 有限超时 ),这将提供一个机会插入处理程序,然后重试。
我一直在玩 Java Flow offer
运算符,但在阅读文档并进行测试后我不明白。
这是我的测试
@Test
public void offer() throws InterruptedException {
//Create Publisher for expected items Strings
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
//Register Subscriber
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
publisher.offer("item", (subscriber, value) -> false);
Thread.sleep(500);
}
offer 运算符接收要发出的项目和 BiPredicate 函数,据我阅读文档的理解,只有在谓词函数为真时才会发出项目。
Bur 通过测试后的结果是
Subscription done:
Subscription done:
Subscription done:
Got : item --> onNext() callback
Got : item --> onNext() callback
Got : item --> onNext() callback
如果 I return true 而不是 false,结果没有变化。
任何人都可以向我解释一下这个运算符。
不,谓词函数用于决定是否重试 docs中提到的发布操作:
onDrop
- if non-null, the handler invoked upon a drop to a subscriber, with arguments of the subscriber and item; if it returns true, an offer is re-attempted (once)
不影响是否最初发送该项目。
编辑:使用 offer
方法时如何发生掉落的示例
我想出了一个示例,说明调用 offer
方法时如何发生掉落。我不认为输出是 100% 确定性的,但是当它是 运行 几次时有明显的区别。您可以将处理程序更改为 return true 而不是 false,以查看重试如何减少由于缓冲区饱和而导致的丢弃。在此示例中,丢弃通常会发生,因为最大缓冲区容量明显很小(传递给 SubmissionPublisher
的构造函数)。但是当在一个小的休眠期后启用重试时,掉落被移除:
public class SubmissionPubliserDropTest {
public static void main(String[] args) throws InterruptedException {
// Create Publisher for expected items Strings
// Note the small buffer max capacity to be able to cause drops
SubmissionPublisher<String> publisher =
new SubmissionPublisher<>(ForkJoinPool.commonPool(), 2);
// Register Subscriber
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
// publish 3 items for each subscriber
for(int i = 0; i < 3; i++) {
int result = publisher.offer("item" + i, (subscriber, value) -> {
// sleep for a small period before deciding whether to retry or not
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return false; // you can switch to true to see that drops are reduced
});
// show the number of dropped items
if(result < 0) {
System.err.println("dropped: " + result);
}
}
Thread.sleep(3000);
publisher.close();
}
}
class CustomSubscriber<T> implements Flow.Subscriber<T> {
private Subscription sub;
@Override
public void onComplete() {
System.out.println("onComplete");
}
@Override
public void onError(Throwable th) {
th.printStackTrace();
sub.cancel();
}
@Override
public void onNext(T arg0) {
System.out.println("Got : " + arg0 + " --> onNext() callback");
sub.request(1);
}
@Override
public void onSubscribe(Subscription sub) {
System.out.println("Subscription done");
this.sub = sub;
sub.request(1);
}
}
The item may be dropped by one or more subscribers if resource limits are exceeded, in which case the given handler (if non-null) is invoked, and if it returns
true
, retried once.
只是为了理解,在你的两个电话中
publisher.offer("item", (subscriber, value) -> true); // the handler would be invoked
publisher.offer("item", (subscriber, value) -> false); // the handler wouldn't be invoked
但 publisher
仍然会向其当前的每个订阅者发布给定的项目。 这发生在您当前的场景。
就资源限制而言,通过尝试重现来验证您提供的处理程序是否被调用的场景很困难,正如文档所建议的:
The item may be dropped by one or more subscribers if resource limits are exceeded, in which case the given handler (if non-null) is invoked, and if it returns true, retried once.
但是您可以尝试删除超时设置为基本最小值的项目,使用
offer(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
timeout
- how long to wait for resources for any subscriber before giving up, in units of unit
unit
- a TimeUnit determining how to interpret the timeout parameter
由于 offer
方法可能会丢弃项目(立即或使用 有限超时 ),这将提供一个机会插入处理程序,然后重试。