Java 9 - 发布者和订阅者如何工作
Java 9 - how publisher and subscriber works
我想了解 Subscriber
和 Publisher
在 java 9.
中的工作原理
我在这里创建了一个 subscriber
并使用 SubmissionPublisher
发布项目。
我正在尝试将 100 个字符串发布到 subscriber
。如果我没有将 Client
程序设为 sleep
( 请参阅 MyReactiveApp
中的注释代码),我看不到所有项目都已发布。
为什么不等待这里处理的所有字符串:
strs.stream().forEach(i -> publisher.submit(i)); // what happens here?
如果我将上面的代码替换为,我会看到所有字符串都打印在控制台中
strs.stream().forEach(System.out::println);
使用SubmissionPublisher
发布的客户端程序。
import java.util.List;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class MyReactiveApp {
public static void main(String args[]) throws InterruptedException {
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
MySubscriber subs = new MySubscriber();
publisher.subscribe(subs);
List<String> strs = getStrs();
System.out.println("Publishing Items to Subscriber");
strs.stream().forEach(i -> publisher.submit(i));
/*while (strs.size() != subs.getCounter()) {
Thread.sleep(10);
}*/
//publisher.close();
System.out.println("Exiting the app");
}
private static List<String> getStrs(){
return Stream.generate(new Supplier<String>() {
int i =1;
@Override
public String get() {
return "name "+ (i++);
}
}).limit(100).collect(Collectors.toList());
}
}
订户
import java.util.concurrent.Flow.Subscription;
public class MySubscriber implements java.util.concurrent.Flow.Subscriber<String>{
private Subscription subscription;
private int counter = 0;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(100);
}
@Override
public void onNext(String item) {
System.out.println(this.getClass().getSimpleName()+" item "+item);
//subscription.request(1);
counter++;
}
@Override
public void onError(Throwable throwable) {
System.out.println(this.getClass().getName()+ " an error occured "+throwable);
}
@Override
public void onComplete() {
System.out.println("activity completed");
}
public int getCounter() {
return counter;
}
}
输出:
Publishing Items to Subscriber
MySubscriber item name 1
MySubscriber item name 2
MySubscriber item name 3
MySubscriber item name 4
MySubscriber item name 5
Exiting the app
MySubscriber item name 6
MySubscriber item name 7
MySubscriber item name 8
MySubscriber item name 9
MySubscriber item name 10
MySubscriber item name 11
MySubscriber item name 12
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
Creates a new SubmissionPublisher using the ForkJoinPool.commonPool() for async delivery to subscribers
所以实际上
strs.stream().forEach(i -> publisher.submit(i));
将所有提交排队并在另一个线程上异步传送它们。但随后应用程序被终止。这与工作线程的进度无关。这意味着无论工作线程已经传送了多少元素,应用程序都会终止。
每个 运行 可以不同。在最坏的情况下,应用程序可能会在交付第一个项目之前终止。
线程数
如果你想验证MyReactiveApp的main方法和MySubscriber的onNext中的传递发生在不同的线程上,你可以打印出相应线程的名称,例如在 MyReactiveApp 的主目录中:
System.out.println(Thread.currentThread().getName())
将输出 main
作为线程名称。
而 MySubscriber 的 onNext 方法将例如输出类似 ForkJoinPool.commonPool-worker-1
的内容。
用户线程和守护线程
尽管我们还有一个 运行ning 线程,为什么应用程序终止了?
Java中有两种线程:
- 用户线程
- 守护线程
当不再有任何用户线程 运行ning 时,Java 程序终止,即使后台线程仍在 运行ning。
主线程是用户线程。 SubmissionPublisher 在此处使用来自 ForkJoinPool.commonPool() 的工作线程。这些是守护线程。
All worker threads are initialized with Thread.isDaemon() set true.
https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/ForkJoinPool.html
我想了解 Subscriber
和 Publisher
在 java 9.
我在这里创建了一个 subscriber
并使用 SubmissionPublisher
发布项目。
我正在尝试将 100 个字符串发布到 subscriber
。如果我没有将 Client
程序设为 sleep
( 请参阅 MyReactiveApp
中的注释代码),我看不到所有项目都已发布。
为什么不等待这里处理的所有字符串:
strs.stream().forEach(i -> publisher.submit(i)); // what happens here?
如果我将上面的代码替换为,我会看到所有字符串都打印在控制台中
strs.stream().forEach(System.out::println);
使用SubmissionPublisher
发布的客户端程序。
import java.util.List;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class MyReactiveApp {
public static void main(String args[]) throws InterruptedException {
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
MySubscriber subs = new MySubscriber();
publisher.subscribe(subs);
List<String> strs = getStrs();
System.out.println("Publishing Items to Subscriber");
strs.stream().forEach(i -> publisher.submit(i));
/*while (strs.size() != subs.getCounter()) {
Thread.sleep(10);
}*/
//publisher.close();
System.out.println("Exiting the app");
}
private static List<String> getStrs(){
return Stream.generate(new Supplier<String>() {
int i =1;
@Override
public String get() {
return "name "+ (i++);
}
}).limit(100).collect(Collectors.toList());
}
}
订户
import java.util.concurrent.Flow.Subscription;
public class MySubscriber implements java.util.concurrent.Flow.Subscriber<String>{
private Subscription subscription;
private int counter = 0;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(100);
}
@Override
public void onNext(String item) {
System.out.println(this.getClass().getSimpleName()+" item "+item);
//subscription.request(1);
counter++;
}
@Override
public void onError(Throwable throwable) {
System.out.println(this.getClass().getName()+ " an error occured "+throwable);
}
@Override
public void onComplete() {
System.out.println("activity completed");
}
public int getCounter() {
return counter;
}
}
输出:
Publishing Items to Subscriber
MySubscriber item name 1
MySubscriber item name 2
MySubscriber item name 3
MySubscriber item name 4
MySubscriber item name 5
Exiting the app
MySubscriber item name 6
MySubscriber item name 7
MySubscriber item name 8
MySubscriber item name 9
MySubscriber item name 10
MySubscriber item name 11
MySubscriber item name 12
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
Creates a new SubmissionPublisher using the ForkJoinPool.commonPool() for async delivery to subscribers
所以实际上
strs.stream().forEach(i -> publisher.submit(i));
将所有提交排队并在另一个线程上异步传送它们。但随后应用程序被终止。这与工作线程的进度无关。这意味着无论工作线程已经传送了多少元素,应用程序都会终止。
每个 运行 可以不同。在最坏的情况下,应用程序可能会在交付第一个项目之前终止。
线程数
如果你想验证MyReactiveApp的main方法和MySubscriber的onNext中的传递发生在不同的线程上,你可以打印出相应线程的名称,例如在 MyReactiveApp 的主目录中:
System.out.println(Thread.currentThread().getName())
将输出 main
作为线程名称。
而 MySubscriber 的 onNext 方法将例如输出类似 ForkJoinPool.commonPool-worker-1
的内容。
用户线程和守护线程
尽管我们还有一个 运行ning 线程,为什么应用程序终止了?
Java中有两种线程:
- 用户线程
- 守护线程
当不再有任何用户线程 运行ning 时,Java 程序终止,即使后台线程仍在 运行ning。
主线程是用户线程。 SubmissionPublisher 在此处使用来自 ForkJoinPool.commonPool() 的工作线程。这些是守护线程。
All worker threads are initialized with Thread.isDaemon() set true.
https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/ForkJoinPool.html