为什么我无法从 Flow API Subscriber 中的 onComplete 访问 class 变量
Why can I not access class variables from onComplete in Flow API Subscriber
使用JavaFlowAPI。我有一个 Processor
链和一个终端 Subscriber
。
我已经编写了 Subscriber
s 来记录收到的物品数量,这些物品在 SubmissionPublisher
.
下可以正常工作
但是,当我将值从 Processor
(扩展 SubmissionPublisher
)传递到下面的 Subscriber
时,控制台不会打印任何输出。
如果我删除 minValue.get()
,文本将打印到控制台。
为什么调用 class 变量导致 onComplete
没有按预期执行?
public class MinValue implements Subscriber<Integer> {
private Flow.Subscription subscription;
private AtomicInteger minValue=new AtomicInteger(Integer.MAX_VALUE);
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Integer item) {
if(minValue.get()>item){
System.out.println("Min Value : "+item);
minValue.set(item);
}
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error + throwable.getMessage() \nMin Value is : "+minValue.get());
}
@Override
public void onComplete() {
System.out.println("Successful Completion - Min Value is : "+minValue.get());
}
}
编辑 - 添加最小可重现示例
public class NewClass {
public static void main(String args[]) {
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
Subscriber<Integer> subscriber = new MinValue();
publisher.subscribe(subscriber);
publisher.submit(10);
publisher.submit(11);
publisher.submit(9);
publisher.submit(12);
publisher.submit(8);
publisher.close();
}
}
minValue.get() returns 一个 int
而 item 是一个 Integer
对象。也许这就是问题所在。
问题是主线程在调用订阅者 onComplete() 之前退出。
如果我在调用 publisher.close() 后添加睡眠,subscriber.onComplete() 将有时间在其自己的线程上执行。
使用JavaFlowAPI。我有一个 Processor
链和一个终端 Subscriber
。
我已经编写了 Subscriber
s 来记录收到的物品数量,这些物品在 SubmissionPublisher
.
但是,当我将值从 Processor
(扩展 SubmissionPublisher
)传递到下面的 Subscriber
时,控制台不会打印任何输出。
如果我删除 minValue.get()
,文本将打印到控制台。
为什么调用 class 变量导致 onComplete
没有按预期执行?
public class MinValue implements Subscriber<Integer> {
private Flow.Subscription subscription;
private AtomicInteger minValue=new AtomicInteger(Integer.MAX_VALUE);
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Integer item) {
if(minValue.get()>item){
System.out.println("Min Value : "+item);
minValue.set(item);
}
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error + throwable.getMessage() \nMin Value is : "+minValue.get());
}
@Override
public void onComplete() {
System.out.println("Successful Completion - Min Value is : "+minValue.get());
}
}
编辑 - 添加最小可重现示例
public class NewClass {
public static void main(String args[]) {
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
Subscriber<Integer> subscriber = new MinValue();
publisher.subscribe(subscriber);
publisher.submit(10);
publisher.submit(11);
publisher.submit(9);
publisher.submit(12);
publisher.submit(8);
publisher.close();
}
}
minValue.get() returns 一个 int
而 item 是一个 Integer
对象。也许这就是问题所在。
问题是主线程在调用订阅者 onComplete() 之前退出。
如果我在调用 publisher.close() 后添加睡眠,subscriber.onComplete() 将有时间在其自己的线程上执行。