提交方法不会调用 onNext FLOW STREAM API JAVA
submit method won't invoke onNext FLOW STREAM API JAVA
我正在学习 Java 中的 FLOW Stream API,我目前正在创建一个基于 Whosebug 上 oracle community. The problem is that I don't get to see the expected output but just the SUBSCRIBING string that is printed inside onSubscribe
method. I already checked and found 的示例,但我没有没用,因为我已经在打电话 request(Long N)
。
import java.util.concurrent.Flow;
public class Computer<T> implements Flow.Subscriber<T> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println("SUBSCRIBING");
this.subscription.request(1);
}
@Override
public void onNext(T item) {
System.out.println(String.format("Got %s", item.toString()));
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("DONE");
}
}
--
import java.util.List;
import java.util.concurrent.SubmissionPublisher;
public class Sensor {
public static void main(String[] args) {
SubmissionPublisher<String> submissionPublisher = new SubmissionPublisher<>();
Computer<String> subscriber = new Computer<>();
submissionPublisher.subscribe(subscriber);
List<String> items = List.of("1.25", "1.224", "1.55");
items.forEach(submissionPublisher::submit);
submissionPublisher.close();
}
}
我刚刚看到:
SUBSCRIBING
为什么 onNext
方法没有被调用?
您没有将 ScheduledExecutorService
传递给 Publisher
,它基本上是一个 ExecutorService,它可以在延迟后将任务安排到 运行 或以固定的时间间隔重复执行在每次执行之间。
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SubmissionPublisher;
public class Sensor {
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
SubmissionPublisher<String> submissionPublisher = new SubmissionPublisher<>(executor, 5);
Computer<String> subscriber = new Computer<>();
submissionPublisher.subscribe(subscriber);
List<String> items = List.of("1.25", "1.224", "1.55");
items.forEach(submissionPublisher::submit);
submissionPublisher.close();
executor.shutdown();
}
}
我正在学习 Java 中的 FLOW Stream API,我目前正在创建一个基于 Whosebug 上 oracle community. The problem is that I don't get to see the expected output but just the SUBSCRIBING string that is printed inside onSubscribe
method. I already checked and found request(Long N)
。
import java.util.concurrent.Flow;
public class Computer<T> implements Flow.Subscriber<T> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println("SUBSCRIBING");
this.subscription.request(1);
}
@Override
public void onNext(T item) {
System.out.println(String.format("Got %s", item.toString()));
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("DONE");
}
}
--
import java.util.List;
import java.util.concurrent.SubmissionPublisher;
public class Sensor {
public static void main(String[] args) {
SubmissionPublisher<String> submissionPublisher = new SubmissionPublisher<>();
Computer<String> subscriber = new Computer<>();
submissionPublisher.subscribe(subscriber);
List<String> items = List.of("1.25", "1.224", "1.55");
items.forEach(submissionPublisher::submit);
submissionPublisher.close();
}
}
我刚刚看到:
SUBSCRIBING
为什么 onNext
方法没有被调用?
您没有将 ScheduledExecutorService
传递给 Publisher
,它基本上是一个 ExecutorService,它可以在延迟后将任务安排到 运行 或以固定的时间间隔重复执行在每次执行之间。
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SubmissionPublisher;
public class Sensor {
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
SubmissionPublisher<String> submissionPublisher = new SubmissionPublisher<>(executor, 5);
Computer<String> subscriber = new Computer<>();
submissionPublisher.subscribe(subscriber);
List<String> items = List.of("1.25", "1.224", "1.55");
items.forEach(submissionPublisher::submit);
submissionPublisher.close();
executor.shutdown();
}
}