RxJava- 无法访问 Observable 的订阅者?
RxJava- No access to Observable's subscribers?
我学会了 it is undesirable 在响应式编程中使用 Subjects
,尽管我发现它们非常方便。但我知道他们可能会被滥用。因此,我尝试创建一个无限 Observable<ImmutableMap<Integer,ActionProfile>
,每次调用 refresh()
时都需要发布一个新的 ImmutableMap
。我还有一个 forKey()
方法,该方法 returns 和 Observable
检索最新的 ActionProfile
匹配特定键。
但是,我对订阅者的处理方式让人感觉有些不合时宜。如果 Observable 的生命是无限的,你必须在 Observable 的构造之外自己管理订阅者,我是正确的吗? Observable 是否维护其订阅者列表?还是我有责任随时打电话给他们 onNext()
?
public final class ActionProfileManager {
private final Observable<ImmutableMap<Integer,ActionProfile>> actionProfiles;
private volatile ImmutableMap<Integer,ActionProfile> actionProfileMap;
//do I really need this?
private final CopyOnWriteArrayList<Subscriber<? super ImmutableMap<Integer,ActionProfile>>> subscribers = new CopyOnWriteArrayList<>();
private ActionProfileManager() {
this.actionProfiles = Observable.create(subscriber -> {
subscriber.onNext(actionProfileMap);
subscribers.add(subscriber); // is it up to me to capture the subscriber here or is it already saved somewhere for me?
});
}
public void refresh() {
actionProfileMap = importFromDb();
subscribers.forEach(s -> s.onNext(actionProfileMap));
}
public Observable<ActionProfile> forKey(int actionProfileId) {
return actionProfiles.map(m -> m.get(actionProfileId));
}
private ImmutableMap<Integer,ActionProfile> importFromDb() {
return ImmutableMap.of(); //import data here
}
}
Cold Observables 通常一次与单个订阅者交互,即使您订阅了它们,它们 运行 独立并且不需要真正了解彼此。
另一方面,Subjects 必须在订阅者多播他们自己收到的事件时跟踪他们的订阅者。
快速查看您的代码表明存在一些竞争条件和丢失通知的可能性。取而代之的是,您可以依赖 BehaviorSubject
,这是异步词的 'reactive property' 。让它存储当前的不可变映射并处理订阅者:
BehaviorSubject<ImmutableMap> bs = BehaviorSubject.create();
Subject<ImmutableMap, ImmutableMap> sync = bs.toSerialized();
forKey(k): bs.map(m -> m.get(k));
refresh(): sync.onNext(importFromDb());
我学会了 it is undesirable 在响应式编程中使用 Subjects
,尽管我发现它们非常方便。但我知道他们可能会被滥用。因此,我尝试创建一个无限 Observable<ImmutableMap<Integer,ActionProfile>
,每次调用 refresh()
时都需要发布一个新的 ImmutableMap
。我还有一个 forKey()
方法,该方法 returns 和 Observable
检索最新的 ActionProfile
匹配特定键。
但是,我对订阅者的处理方式让人感觉有些不合时宜。如果 Observable 的生命是无限的,你必须在 Observable 的构造之外自己管理订阅者,我是正确的吗? Observable 是否维护其订阅者列表?还是我有责任随时打电话给他们 onNext()
?
public final class ActionProfileManager {
private final Observable<ImmutableMap<Integer,ActionProfile>> actionProfiles;
private volatile ImmutableMap<Integer,ActionProfile> actionProfileMap;
//do I really need this?
private final CopyOnWriteArrayList<Subscriber<? super ImmutableMap<Integer,ActionProfile>>> subscribers = new CopyOnWriteArrayList<>();
private ActionProfileManager() {
this.actionProfiles = Observable.create(subscriber -> {
subscriber.onNext(actionProfileMap);
subscribers.add(subscriber); // is it up to me to capture the subscriber here or is it already saved somewhere for me?
});
}
public void refresh() {
actionProfileMap = importFromDb();
subscribers.forEach(s -> s.onNext(actionProfileMap));
}
public Observable<ActionProfile> forKey(int actionProfileId) {
return actionProfiles.map(m -> m.get(actionProfileId));
}
private ImmutableMap<Integer,ActionProfile> importFromDb() {
return ImmutableMap.of(); //import data here
}
}
Cold Observables 通常一次与单个订阅者交互,即使您订阅了它们,它们 运行 独立并且不需要真正了解彼此。
另一方面,Subjects 必须在订阅者多播他们自己收到的事件时跟踪他们的订阅者。
快速查看您的代码表明存在一些竞争条件和丢失通知的可能性。取而代之的是,您可以依赖 BehaviorSubject
,这是异步词的 'reactive property' 。让它存储当前的不可变映射并处理订阅者:
BehaviorSubject<ImmutableMap> bs = BehaviorSubject.create();
Subject<ImmutableMap, ImmutableMap> sync = bs.toSerialized();
forKey(k): bs.map(m -> m.get(k));
refresh(): sync.onNext(importFromDb());