如何将订阅者连接到 reactor.core.publisher.Flux?
How to connect a Subscriber with a reactor.core.publisher.Flux?
我想将订阅者与 Reactor Flux 连接起来。但是我的小例子没有产生任何输出:
public static void main(String[] args) throws InterruptedException {
Flux.just("a", "b", "c")
.subscribe(new BaseSubscriber<String>() {
@Override
protected void hookOnSubscribe(Subscription ignored) {
}
@Override
protected void hookOnNext(String value) {
System.out.print(value);
}
});
Thread.sleep(1000);
}
我用 org.reactivestreams.Subscriber 做了同样的尝试:
Flux.just("a", "b", "c")
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
但是还是没有输出。
如果我尝试最简单的:
Flux.just("a", "b", "c").subscribe(System.out::println);
...它产生预期的输出:
a
b
c
如何使用 Subscriber 从 Flux 接收值?
关键是 request
来自源的更多值。直到那个电话什么都没有发生。
示例BaseSubscriber
:
Flux<String> source = Flux.just("a", "b", "c");
source.subscribe(new BaseSubscriber<String>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(1); // <-- here
}
@Override
protected void hookOnNext(String value) {
request(1); // <-- here
System.out.println(value);
}
});
与标准相同Subscriber
:
Flux<String> source = Flux.just("a", "b", "c");
source.subscribe(new Subscriber<String>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1); // <-- here
}
@Override
public void onNext(String s) {
subscription.request(1); // <-- here
System.out.println(s);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
我想将订阅者与 Reactor Flux 连接起来。但是我的小例子没有产生任何输出:
public static void main(String[] args) throws InterruptedException {
Flux.just("a", "b", "c")
.subscribe(new BaseSubscriber<String>() {
@Override
protected void hookOnSubscribe(Subscription ignored) {
}
@Override
protected void hookOnNext(String value) {
System.out.print(value);
}
});
Thread.sleep(1000);
}
我用 org.reactivestreams.Subscriber 做了同样的尝试:
Flux.just("a", "b", "c")
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
但是还是没有输出。
如果我尝试最简单的:
Flux.just("a", "b", "c").subscribe(System.out::println);
...它产生预期的输出:
a
b
c
如何使用 Subscriber 从 Flux 接收值?
关键是 request
来自源的更多值。直到那个电话什么都没有发生。
示例BaseSubscriber
:
Flux<String> source = Flux.just("a", "b", "c");
source.subscribe(new BaseSubscriber<String>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(1); // <-- here
}
@Override
protected void hookOnNext(String value) {
request(1); // <-- here
System.out.println(value);
}
});
与标准相同Subscriber
:
Flux<String> source = Flux.just("a", "b", "c");
source.subscribe(new Subscriber<String>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1); // <-- here
}
@Override
public void onNext(String s) {
subscription.request(1); // <-- here
System.out.println(s);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});