RxJava 只订阅特定的密钥
RxJava subscribe only for specific key
我收到了无穷无尽的事件流,假设 Observable<Event>
其中 Event(userId, payload)
。我必须提供允许订阅用户事件的功能。 Observable<Event>
是一个热门资源,但没关系,因为如果没有感兴趣的用户,我可以删除事件。我正在尝试使用 RxJava API 以纯粹的流媒体方式来做这件事,但这对我来说似乎很难。我尝试了什么:
- 使用内置运算符以最省力的方式完成。
Observable<Event> events =; //;
events = events.publish().refCount();
Observable<Event> subscribeUser(int userId) {
return events.filter(it -> it.userId == userId);
}
效果很好,问题是它对我来说太慢了——我不需要向所有订阅者进行多播,然后在它不属于任何用户时进行过滤。想象一下,每秒有数万个用户和 1000 个事件 - O(n) 复杂度对我来说太多了 - 我认为我需要一些散列。
PublishSubject
ConcurrentHashMap<Integer, PublishSubject<Event>> subscriptions = new ConcurrentHashMap<>();
Observable<Event> events =; //
events.subscribe(event -> /*pushing on user subject*/);
Observable<Event> subscribeUser(int userId) {
return subscriptions.putIfAbsent(userId, PublishSubject.create())
.doOnTerminate(() -> subscriptions.remove(userId))
}
这需要更多的编码(比上面的例子多得多)来管理订阅,它不是纯粹的流媒体 PublishSubject
是为多个订阅者设计的,所以它需要更多的代码 space 用于内部数组我不需要,因为我每个主题只有一个用户。
groupBy()
运算符
我可以按生成 GroupedObservable
流的 userId 对事件进行分组,然后我可以 .ignoreElements()
直到没有订阅,但我不知道如何将这两者连接在一起。一方面,我有 GroupedObservable<Integer, Event>
的热 Observable
流,然后是传入的 Observable<Event> subscribeUser(int userId)
请求。我怎么能说:“嘿,从现在开始不要丢弃元素,因为有人对此感兴趣”和 return GroupedObservable
作为对请求的响应?
自定义Observable
这里的问题是 Observable.create(subscriber -> )
订户没有关于其密钥 (userId) 的信息 - 我也无法控制 subscribe()
方法,因此我无法将我的自定义实现传递到那里并在之后检索密钥铸造或某种方式。这也很糟糕,因为我需要自己管理状态。
lift()
自定义运算符
与 4 类似的问题,甚至更糟,因为我无法控制在这里创建 Observable
所以我还需要防止与 source 的多个连接
选项 2) 是最直接的。选项 4 需要更聪明的编码 Observable
.
ConcurrentMap<Integer, Observer<Event>> observers = ...
void signal(Event event) {
var observer = observers.get(event.userId);
if (observer != null) {
observer.onNext(event);
}
}
Observable<Event> observeUser(int userId) {
return Observable.unsafeCreate(observer -> {
var remove = Disposable.fromAction(() -> observers.remove(userId));
subscriber.onSubscribe(remove);
observers.put(userId, observer);
});
}
我收到了无穷无尽的事件流,假设 Observable<Event>
其中 Event(userId, payload)
。我必须提供允许订阅用户事件的功能。 Observable<Event>
是一个热门资源,但没关系,因为如果没有感兴趣的用户,我可以删除事件。我正在尝试使用 RxJava API 以纯粹的流媒体方式来做这件事,但这对我来说似乎很难。我尝试了什么:
- 使用内置运算符以最省力的方式完成。
Observable<Event> events =; //;
events = events.publish().refCount();
Observable<Event> subscribeUser(int userId) {
return events.filter(it -> it.userId == userId);
}
效果很好,问题是它对我来说太慢了——我不需要向所有订阅者进行多播,然后在它不属于任何用户时进行过滤。想象一下,每秒有数万个用户和 1000 个事件 - O(n) 复杂度对我来说太多了 - 我认为我需要一些散列。
PublishSubject
ConcurrentHashMap<Integer, PublishSubject<Event>> subscriptions = new ConcurrentHashMap<>();
Observable<Event> events =; //
events.subscribe(event -> /*pushing on user subject*/);
Observable<Event> subscribeUser(int userId) {
return subscriptions.putIfAbsent(userId, PublishSubject.create())
.doOnTerminate(() -> subscriptions.remove(userId))
}
这需要更多的编码(比上面的例子多得多)来管理订阅,它不是纯粹的流媒体 PublishSubject
是为多个订阅者设计的,所以它需要更多的代码 space 用于内部数组我不需要,因为我每个主题只有一个用户。
groupBy()
运算符 我可以按生成GroupedObservable
流的 userId 对事件进行分组,然后我可以.ignoreElements()
直到没有订阅,但我不知道如何将这两者连接在一起。一方面,我有GroupedObservable<Integer, Event>
的热Observable
流,然后是传入的Observable<Event> subscribeUser(int userId)
请求。我怎么能说:“嘿,从现在开始不要丢弃元素,因为有人对此感兴趣”和 returnGroupedObservable
作为对请求的响应?自定义
Observable
这里的问题是 Observable.create(subscriber -> )
订户没有关于其密钥 (userId) 的信息 - 我也无法控制 subscribe()
方法,因此我无法将我的自定义实现传递到那里并在之后检索密钥铸造或某种方式。这也很糟糕,因为我需要自己管理状态。
lift()
自定义运算符 与 4 类似的问题,甚至更糟,因为我无法控制在这里创建Observable
所以我还需要防止与 source 的多个连接
选项 2) 是最直接的。选项 4 需要更聪明的编码 Observable
.
ConcurrentMap<Integer, Observer<Event>> observers = ...
void signal(Event event) {
var observer = observers.get(event.userId);
if (observer != null) {
observer.onNext(event);
}
}
Observable<Event> observeUser(int userId) {
return Observable.unsafeCreate(observer -> {
var remove = Disposable.fromAction(() -> observers.remove(userId));
subscriber.onSubscribe(remove);
observers.put(userId, observer);
});
}