如何在 ReactiveX 中过滤过于频繁的 onSubscribe 请求
How to filter in ReactiveX too frequent onSubscribe requests
我有一个执行耗时网络操作的可观察对象。客户端代码可能会频繁订阅导致高网络负载的可观察对象。
由于我们无法控制订阅者何时出现,因此必须在可观察端完成。
当并发订阅者数量达到最大值时,您希望更多订阅者接收空流。
给定您要限制订阅的来源,执行以下操作:
Observable<T> limited = source.compose(
new TransformerLimitSubscribers<T>(
new AtomicInteger(), maxSubscribers))
.onErrorResumeNext(Observable.<T>empty());
...
limited.subscribe(s1);
...
limited.subscribe(s2);
其中变换器由此定义 class:
public final class TransformerLimitSubscribers<T> implements Transformer<T, T> {
private final AtomicInteger subscriberCount;
private final int maxSubscribers;
public TransformerLimitSubscribers(AtomicInteger subscriberCount, int maxSubscribers) {
this.subscriberCount = subscriberCount;
this.maxSubscribers = maxSubscribers;
}
@Override
public Observable<T> call(Observable<T> o) {
return o.doOnSubscribe(onSubscribe()).doOnUnsubscribe(onUnsubscribe());
}
private Action0 onSubscribe() {
return new Action0() {
@Override
public void call() {
if (subscriberCount.incrementAndGet() > maxSubscribers)
throw new TooManySubscribersException();
}
};
}
private Action0 onUnsubscribe() {
return new Action0() {
@Override
public void call() {
subscriberCount.decrementAndGet();
}
};
}
public static class TooManySubscribersException extends RuntimeException {
}
}
我有一个执行耗时网络操作的可观察对象。客户端代码可能会频繁订阅导致高网络负载的可观察对象。
由于我们无法控制订阅者何时出现,因此必须在可观察端完成。
当并发订阅者数量达到最大值时,您希望更多订阅者接收空流。
给定您要限制订阅的来源,执行以下操作:
Observable<T> limited = source.compose(
new TransformerLimitSubscribers<T>(
new AtomicInteger(), maxSubscribers))
.onErrorResumeNext(Observable.<T>empty());
...
limited.subscribe(s1);
...
limited.subscribe(s2);
其中变换器由此定义 class:
public final class TransformerLimitSubscribers<T> implements Transformer<T, T> {
private final AtomicInteger subscriberCount;
private final int maxSubscribers;
public TransformerLimitSubscribers(AtomicInteger subscriberCount, int maxSubscribers) {
this.subscriberCount = subscriberCount;
this.maxSubscribers = maxSubscribers;
}
@Override
public Observable<T> call(Observable<T> o) {
return o.doOnSubscribe(onSubscribe()).doOnUnsubscribe(onUnsubscribe());
}
private Action0 onSubscribe() {
return new Action0() {
@Override
public void call() {
if (subscriberCount.incrementAndGet() > maxSubscribers)
throw new TooManySubscribersException();
}
};
}
private Action0 onUnsubscribe() {
return new Action0() {
@Override
public void call() {
subscriberCount.decrementAndGet();
}
};
}
public static class TooManySubscribersException extends RuntimeException {
}
}