Spring + RxJava + Schedule cron job: 为什么 bean 在 connectable observable 上只调用一次
Spring + RxJava + Schedule cron job: why beans called only once on connectable observable
我有以下演示问题的代码:
@Component
public class App {
@Autowired S1 s1;
@Autowired S2 s2;
int jobs = 0;
@Scheduled(cron = "0 * * * * ?")
void foo() {
System.out.println("schedule cron job: " + jobs++);
Observable<String> observable = Observable.just("bar");
ConnectableObservable<String> publishedObservable = observable.publish();
publishedObservable.subscribe(s1);
publishedObservable.subscribe(s2);
publishedObservable.connect();
}
}
订户 1:
@Component
public class S1 extends Subscriber<String> {
private AtomicInteger counter = new AtomicInteger(0);
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println("S1:::: Times called: " + counter.getAndIncrement() + ", input: " + s);
}
}
订户 2:
@Component
public class S2 extends Subscriber<String> {
private AtomicInteger counter = new AtomicInteger(0);
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println("S2:::: Times called: " + counter.getAndIncrement() + ", input: " + s);
}
}
输出将是:
schedule cron job: 0
S1:::: Times called: 0, input: bar
S2:::: Times called: 0, input: bar
schedule cron job: 1
schedule cron job: 2
schedule cron job: 3
schedule cron job: 4
......
为什么每次调用 foo 方法时都不调用 S1 和 S2?
如何实现?
这是因为 rx 某些订阅逻辑还是因为这些 bean 是单例?
Why is S1 and S2 not called each time the foo method called ?
RxJava Subscriber
s 是有状态的,一旦它们消费了一个序列,它们就不再可用并报告自己为未订阅。再次订阅他们没有任何效果。每次需要订阅源时,您都必须重新创建它们。
我有以下演示问题的代码:
@Component
public class App {
@Autowired S1 s1;
@Autowired S2 s2;
int jobs = 0;
@Scheduled(cron = "0 * * * * ?")
void foo() {
System.out.println("schedule cron job: " + jobs++);
Observable<String> observable = Observable.just("bar");
ConnectableObservable<String> publishedObservable = observable.publish();
publishedObservable.subscribe(s1);
publishedObservable.subscribe(s2);
publishedObservable.connect();
}
}
订户 1:
@Component
public class S1 extends Subscriber<String> {
private AtomicInteger counter = new AtomicInteger(0);
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println("S1:::: Times called: " + counter.getAndIncrement() + ", input: " + s);
}
}
订户 2:
@Component
public class S2 extends Subscriber<String> {
private AtomicInteger counter = new AtomicInteger(0);
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println("S2:::: Times called: " + counter.getAndIncrement() + ", input: " + s);
}
}
输出将是:
schedule cron job: 0
S1:::: Times called: 0, input: bar
S2:::: Times called: 0, input: bar
schedule cron job: 1
schedule cron job: 2
schedule cron job: 3
schedule cron job: 4
......
为什么每次调用 foo 方法时都不调用 S1 和 S2? 如何实现?
这是因为 rx 某些订阅逻辑还是因为这些 bean 是单例?
Why is S1 and S2 not called each time the foo method called ?
RxJava Subscriber
s 是有状态的,一旦它们消费了一个序列,它们就不再可用并报告自己为未订阅。再次订阅他们没有任何效果。每次需要订阅源时,您都必须重新创建它们。