RxJava:如何仅在 Observable 冷时订阅?
RxJava: How to Subscribe only if the Observable is Cold?
TL;DR:如何创建一个只在冷时创建订阅并在热[时将任何其他订阅调用排队的 Observable =24=]?
我想创建一个一次只能执行一个订阅的 Observable。如果任何其他订阅者订阅了 Observable,我希望他们在 Observable 完成时(在 onComplete 之后)排队到 运行。
我可以通过拥有某种堆栈并在每次 onComplete 时弹出堆栈来自己构建这个构造 - 但感觉这个功能已经存在于 RxJava 中。
有没有办法用这种方式限制订阅?
我认为没有内置运算符或运算符组合可以实现此目的。以下是我的实现方式:
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.*;
import rx.*;
import rx.observers.TestSubscriber;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;
public final class SequenceSubscribers<T> implements Observable.OnSubscribe<T> {
final Observable<? extends T> source;
final Queue<Subscriber<? super T>> queue;
final AtomicInteger wip;
volatile boolean active;
public SequenceSubscribers(Observable<? extends T> source) {
this.source = source;
this.queue = new ConcurrentLinkedQueue<>();
this.wip = new AtomicInteger();
}
@Override
public void call(Subscriber<? super T> t) {
SubscriberWrapper wrapper = new SubscriberWrapper(t);
queue.add(wrapper);
t.add(wrapper);
t.add(Subscriptions.create(() -> wrapper.next()));
drain();
}
void complete(SubscriberWrapper inner) {
active = false;
drain();
}
void drain() {
if (wip.getAndIncrement() != 0) {
return;
}
do {
if (!active) {
Subscriber<? super T> s = queue.poll();
if (s != null && !s.isUnsubscribed()) {
active = true;
source.subscribe(s);
}
}
} while (wip.decrementAndGet() != 0);
}
final class SubscriberWrapper extends Subscriber<T> {
final Subscriber<? super T> actual;
final AtomicBoolean once;
public SubscriberWrapper(Subscriber<? super T> actual) {
this.actual = actual;
this.once = new AtomicBoolean();
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable e) {
actual.onError(e);
next();
}
@Override
public void onCompleted() {
actual.onCompleted();
next();
}
@Override
public void setProducer(Producer p) {
actual.setProducer(p);
}
void next() {
if (once.compareAndSet(false, true)) {
complete(this);
}
}
}
public static void main(String[] args) {
PublishSubject<Integer> ps = PublishSubject.create();
TestSubscriber<Integer> ts1 = TestSubscriber.create();
TestSubscriber<Integer> ts2 = TestSubscriber.create();
Observable<Integer> source = Observable.create(new SequenceSubscribers<>(ps));
source.subscribe(ts1);
source.subscribe(ts2);
ps.onNext(1);
ps.onNext(2);
ts1.assertValues(1, 2);
ts2.assertNoValues();
ts1.unsubscribe();
ps.onNext(3);
ps.onNext(4);
ps.onCompleted();
ts1.assertValues(1, 2);
ts2.assertValues(3, 4);
ts2.assertCompleted();
}
}
我假设您有一个潜在的冷可观察对象,您想要多次订阅,但只有在上一次订阅完成后才订阅。
我们可以巧妙地使用可以延迟新订阅的内置函数 delaySubscription
。下一个障碍是在上一个订阅完成时触发订阅。我们使用 doOnUnsubscribe
执行此操作,这会触发取消订阅、onError
和 onCompleted
操作。
public class DelaySubscribe<T> {
Observable<Integer> previouse = Observable.just(0);
private DelaySubscribe() {
}
public static <T> Observable<T> makeDelayOb(Observable<T> cold) {
return new DelaySubscribe<T>().obs(cold);
}
private Observable<T> obs(Observable<T> cold) {
return Observable.create(ob -> {
Observable<Integer> tmp = previouse;
ReplaySubject<Integer> rep = ReplaySubject.create();
previouse = rep;
cold.delaySubscription(() -> tmp).doOnUnsubscribe(() -> {
rep.onNext(0);
rep.onCompleted();
}).subscribe(ob);
});
}
用法示例:
public static void main(String[] args) throws IOException, InterruptedException {
Observable<Long> cold = Observable.interval(1, TimeUnit.SECONDS).take(2);
Observable<Long> hot = makeDelayOb(cold);
Func1<Integer, rx.Observer<Long>> obs = (Integer i) -> Observers.create(el -> System.out.println(i + "next: " + el),
er -> System.out.println(i + "error: " + er), () -> System.out.println(i + "completed"));
System.out.println("1");
Subscription s = hot.subscribe(obs.call(1));
System.out.println("2");
hot.subscribe(obs.call(2));
Thread.sleep(1500);
s.unsubscribe();
System.out.println("3");
Thread.sleep(3500);
hot.subscribe(obs.call(3));
System.out.println("4");
System.in.read();
}
}
输出:
1
2
1next: 0
3
2next: 0
2next: 1
2completed
4
3next: 0
3next: 1
3completed
我假设您有一个具有多个订阅的热可观察对象,但只希望一个订阅接收事件。当前订阅退订后,下一个应该开始接收。
我们能做的是给每个订阅和唯一编号,并保留所有订阅的列表。只有列表中的第一个订阅会收到事件,其余的 filter
事件消失了。
public class SingleSubscribe {
List<Integer> current = Collections.synchronizedList(new ArrayList<>());
int max = 0;
Object gate = new Object();
private SingleSubscribe() {
}
public static <T> Transformer<T, T> singleSubscribe() {
return new SingleSubscribe().obs();
}
private <T> Transformer<T, T> obs() {
return (source) -> Observable.create((Subscriber<? super T> ob) -> {
Integer me;
synchronized (gate) {
me = max++;
}
current.add(me);
source.doOnUnsubscribe(() -> current.remove(me)).filter(__ -> {
return current.get(0) == me;
}).subscribe(ob);
});
}
用法示例:
public static void main(String[] args) throws InterruptedException, IOException {
ConnectableObservable<Long> connectable = Observable.interval(500, TimeUnit.MILLISECONDS)
.publish();
Observable<Long> hot = connectable.compose(SingleSubscribe.<Long> singleSubscribe());
Subscription sub = connectable.connect();
Func1<Integer, rx.Observer<Long>> obs = (Integer i) -> Observers.create(el -> System.out.println(i + "next: " + el),
er -> {
System.out.println(i + "error: " + er);
er.printStackTrace();
} , () -> System.out.println(i + "completed"));
System.out.println("1");
Subscription s = hot.subscribe(obs.call(1));
System.out.println("2");
hot.take(4).subscribe(obs.call(2));
Thread.sleep(1500);
s.unsubscribe();
System.out.println("3");
Thread.sleep(500);
hot.take(2).subscribe(obs.call(3));
System.out.println("4");
System.in.read();
sub.unsubscribe();
}
}
输出:
1
2
1next: 0
1next: 1
1next: 2
3
2next: 3
4
2next: 4
2next: 5
2next: 6
2completed
3next: 6
3next: 7
3completed
请注意输出中有一个小缺陷:因为 2 在收到 6 之后立即取消订阅,但在 3 收到 6 之前。在 2 取消订阅后,3 是下一个活跃的观察者并愉快地接受 6。
一个解决方案是延迟 doOnUnsubscribe
,最简单的方法是在新的线程调度程序上安排操作:
source.doOnUnsubscribe(() -> Schedulers.newThread().createWorker().schedule(()-> current.remove(me)))
然而,这意味着当 2 取消订阅时,下一个发出的项目现在有很小的机会被忽略,并且下一个项目在 3 被激活之前到达。使用最适合您的变体。
最后,这个解决方案绝对假设源是热的。我不确定将此运算符应用于冷可观察对象时会发生什么,但结果可能出乎意料。
TL;DR:如何创建一个只在冷时创建订阅并在热[时将任何其他订阅调用排队的 Observable =24=]?
我想创建一个一次只能执行一个订阅的 Observable。如果任何其他订阅者订阅了 Observable,我希望他们在 Observable 完成时(在 onComplete 之后)排队到 运行。
我可以通过拥有某种堆栈并在每次 onComplete 时弹出堆栈来自己构建这个构造 - 但感觉这个功能已经存在于 RxJava 中。
有没有办法用这种方式限制订阅?
我认为没有内置运算符或运算符组合可以实现此目的。以下是我的实现方式:
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.*;
import rx.*;
import rx.observers.TestSubscriber;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;
public final class SequenceSubscribers<T> implements Observable.OnSubscribe<T> {
final Observable<? extends T> source;
final Queue<Subscriber<? super T>> queue;
final AtomicInteger wip;
volatile boolean active;
public SequenceSubscribers(Observable<? extends T> source) {
this.source = source;
this.queue = new ConcurrentLinkedQueue<>();
this.wip = new AtomicInteger();
}
@Override
public void call(Subscriber<? super T> t) {
SubscriberWrapper wrapper = new SubscriberWrapper(t);
queue.add(wrapper);
t.add(wrapper);
t.add(Subscriptions.create(() -> wrapper.next()));
drain();
}
void complete(SubscriberWrapper inner) {
active = false;
drain();
}
void drain() {
if (wip.getAndIncrement() != 0) {
return;
}
do {
if (!active) {
Subscriber<? super T> s = queue.poll();
if (s != null && !s.isUnsubscribed()) {
active = true;
source.subscribe(s);
}
}
} while (wip.decrementAndGet() != 0);
}
final class SubscriberWrapper extends Subscriber<T> {
final Subscriber<? super T> actual;
final AtomicBoolean once;
public SubscriberWrapper(Subscriber<? super T> actual) {
this.actual = actual;
this.once = new AtomicBoolean();
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable e) {
actual.onError(e);
next();
}
@Override
public void onCompleted() {
actual.onCompleted();
next();
}
@Override
public void setProducer(Producer p) {
actual.setProducer(p);
}
void next() {
if (once.compareAndSet(false, true)) {
complete(this);
}
}
}
public static void main(String[] args) {
PublishSubject<Integer> ps = PublishSubject.create();
TestSubscriber<Integer> ts1 = TestSubscriber.create();
TestSubscriber<Integer> ts2 = TestSubscriber.create();
Observable<Integer> source = Observable.create(new SequenceSubscribers<>(ps));
source.subscribe(ts1);
source.subscribe(ts2);
ps.onNext(1);
ps.onNext(2);
ts1.assertValues(1, 2);
ts2.assertNoValues();
ts1.unsubscribe();
ps.onNext(3);
ps.onNext(4);
ps.onCompleted();
ts1.assertValues(1, 2);
ts2.assertValues(3, 4);
ts2.assertCompleted();
}
}
我假设您有一个潜在的冷可观察对象,您想要多次订阅,但只有在上一次订阅完成后才订阅。
我们可以巧妙地使用可以延迟新订阅的内置函数 delaySubscription
。下一个障碍是在上一个订阅完成时触发订阅。我们使用 doOnUnsubscribe
执行此操作,这会触发取消订阅、onError
和 onCompleted
操作。
public class DelaySubscribe<T> {
Observable<Integer> previouse = Observable.just(0);
private DelaySubscribe() {
}
public static <T> Observable<T> makeDelayOb(Observable<T> cold) {
return new DelaySubscribe<T>().obs(cold);
}
private Observable<T> obs(Observable<T> cold) {
return Observable.create(ob -> {
Observable<Integer> tmp = previouse;
ReplaySubject<Integer> rep = ReplaySubject.create();
previouse = rep;
cold.delaySubscription(() -> tmp).doOnUnsubscribe(() -> {
rep.onNext(0);
rep.onCompleted();
}).subscribe(ob);
});
}
用法示例:
public static void main(String[] args) throws IOException, InterruptedException {
Observable<Long> cold = Observable.interval(1, TimeUnit.SECONDS).take(2);
Observable<Long> hot = makeDelayOb(cold);
Func1<Integer, rx.Observer<Long>> obs = (Integer i) -> Observers.create(el -> System.out.println(i + "next: " + el),
er -> System.out.println(i + "error: " + er), () -> System.out.println(i + "completed"));
System.out.println("1");
Subscription s = hot.subscribe(obs.call(1));
System.out.println("2");
hot.subscribe(obs.call(2));
Thread.sleep(1500);
s.unsubscribe();
System.out.println("3");
Thread.sleep(3500);
hot.subscribe(obs.call(3));
System.out.println("4");
System.in.read();
}
}
输出:
1
2
1next: 0
3
2next: 0
2next: 1
2completed
4
3next: 0
3next: 1
3completed
我假设您有一个具有多个订阅的热可观察对象,但只希望一个订阅接收事件。当前订阅退订后,下一个应该开始接收。
我们能做的是给每个订阅和唯一编号,并保留所有订阅的列表。只有列表中的第一个订阅会收到事件,其余的 filter
事件消失了。
public class SingleSubscribe {
List<Integer> current = Collections.synchronizedList(new ArrayList<>());
int max = 0;
Object gate = new Object();
private SingleSubscribe() {
}
public static <T> Transformer<T, T> singleSubscribe() {
return new SingleSubscribe().obs();
}
private <T> Transformer<T, T> obs() {
return (source) -> Observable.create((Subscriber<? super T> ob) -> {
Integer me;
synchronized (gate) {
me = max++;
}
current.add(me);
source.doOnUnsubscribe(() -> current.remove(me)).filter(__ -> {
return current.get(0) == me;
}).subscribe(ob);
});
}
用法示例:
public static void main(String[] args) throws InterruptedException, IOException {
ConnectableObservable<Long> connectable = Observable.interval(500, TimeUnit.MILLISECONDS)
.publish();
Observable<Long> hot = connectable.compose(SingleSubscribe.<Long> singleSubscribe());
Subscription sub = connectable.connect();
Func1<Integer, rx.Observer<Long>> obs = (Integer i) -> Observers.create(el -> System.out.println(i + "next: " + el),
er -> {
System.out.println(i + "error: " + er);
er.printStackTrace();
} , () -> System.out.println(i + "completed"));
System.out.println("1");
Subscription s = hot.subscribe(obs.call(1));
System.out.println("2");
hot.take(4).subscribe(obs.call(2));
Thread.sleep(1500);
s.unsubscribe();
System.out.println("3");
Thread.sleep(500);
hot.take(2).subscribe(obs.call(3));
System.out.println("4");
System.in.read();
sub.unsubscribe();
}
}
输出:
1
2
1next: 0
1next: 1
1next: 2
3
2next: 3
4
2next: 4
2next: 5
2next: 6
2completed
3next: 6
3next: 7
3completed
请注意输出中有一个小缺陷:因为 2 在收到 6 之后立即取消订阅,但在 3 收到 6 之前。在 2 取消订阅后,3 是下一个活跃的观察者并愉快地接受 6。
一个解决方案是延迟 doOnUnsubscribe
,最简单的方法是在新的线程调度程序上安排操作:
source.doOnUnsubscribe(() -> Schedulers.newThread().createWorker().schedule(()-> current.remove(me)))
然而,这意味着当 2 取消订阅时,下一个发出的项目现在有很小的机会被忽略,并且下一个项目在 3 被激活之前到达。使用最适合您的变体。
最后,这个解决方案绝对假设源是热的。我不确定将此运算符应用于冷可观察对象时会发生什么,但结果可能出乎意料。