RxJava 一个发布者的多个消费者
RxJava multiple consumers of one publisher
我正在编写某种带有缓存的中间件 HTTP 代理。工作流程是:
- 客户端请求此代理资源
- 如果缓存中存在资源,代理returns它
- 如果找不到资源,则代理获取远程资源并returns给用户。代理在数据加载时将此资源保存到缓存中。
我的接口有 Publisher<ByteBuffer>
远程资源流,接受 Publisher<ByteBuffer>
保存的缓存,以及接受 Publisher<ByteBuffer>
作为响应的客户端连接:
// remote resource
interface Resource {
Publisher<ByteBuffer> fetch();
}
// cache
interface Cache {
Completable save(Publisher<ByteBuffer> data);
}
// clien response connection
interface Connection {
Completable send(Publisher<ByteBuffer> data);
}
我的问题是,在向客户端发送响应时,我需要延迟将字节缓冲区流保存到缓存中,因此 客户端应该负责 请求 ByteByffer
来自远程资源的块,不缓存。
我尝试使用Publisher::cache
方法,但这对我来说不是一个好的选择,因为它将所有接收到的数据保存在内存中,这是不可接受的,因为缓存的数据可能只有几GB。
作为一种解决方法,我创建了 Subject
填充了从 Resource
收到的下一个项目:
private final Cache cache;
private final Connection out;
Completable proxy(Resource res) {
Subject<ByteBuffer> mirror = PublishSUbject.create();
return Completable.mergeArray(
out.send(res.fetch().doOnNext(mirror::onNext),
cache.save(mirror.toFlowable(BackpressureStrategy.BUFFER))
);
}
是否可以重用相同的 Publisher
而不在内存中缓存项目,并且只有一个订阅者负责从发布者请求项目?
我可能遗漏了一些东西(添加了关于我的 Publisher
界面版本不同的评论)。
但是..这就是我在概念上如何做这样的事情。
我打算简化接口来处理 Integers
:
// remote resource
interface Resource {
ConnectableObservable<Integer> fetch();
}
// cache
interface Cache {
Completable save(Integer data);
}
// client response connection
interface Connection {
Completable send(Integer data);
}
我会使用 Observable::publish
创建一个 ConnectableObservable
并建立两个订阅:
@Test
public void testProxy()
{
// Override schedulers:
TestScheduler s = new TestScheduler();
RxJavaPlugins.setIoSchedulerHandler(
scheduler -> s );
RxJavaPlugins.setComputationSchedulerHandler(
scheduler -> s );
// Mock interfaces:
Resource resource = () -> Observable.range( 1, 100 )
.publish();
Cache cache = data -> Completable.fromObservable( Observable.just( data )
.delay( 100, TimeUnit.MILLISECONDS )
.doOnNext( __ -> System.out.println( String.format( "Caching %d", data ))));
Connection connection = data -> Completable.fromObservable( Observable.just( data )
.delay( 500, TimeUnit.MILLISECONDS )
.doOnNext( __ -> System.out.println( String.format( "Sending %d", data ))));
// Subscribe to resource:
ConnectableObservable<Integer> observable = resource.fetch();
observable
.observeOn( Schedulers.io() )
.concatMapCompletable( data -> connection.send( data ))
.subscribe();
observable
.observeOn( Schedulers.computation() )
.concatMapCompletable( data -> cache.save( data ))
.subscribe();
observable.connect();
// Simulate passage of time:
s.advanceTimeBy( 10, TimeUnit.SECONDS );
}
输出:
Caching 1
Caching 2
Caching 3
Caching 4
Sending 1
Caching 5
Caching 6
Caching 7
Caching 8
Caching 9
Sending 2
Caching 10
. . .
更新
根据您的评论,听起来尊重背压对您的情况很重要。
假设您有一个支持背压的 Publisher
,您可以将其转换为 Flowable
,如下所示:
Flowable<T> flowable = Flowable.fromPublisher( publisher );
一旦您拥有 Flowable
,您就可以允许多个订阅者,而不必担心每个订阅者都必须从 Publisher
请求值(或者任一订阅者在建立订阅时错过任何事件)。您可以通过调用 flowable.publish()
来创建 ConnectableFlowable
.
ConnectableFlowable<T> flowable = Flowable.fromPublisher( publisher ).publish();
out.send(flowable); // calls flowable.subscribe()
cache.save(flowable); // calls flowable.subscribe()
flowable.connect(); // begins emitting values
我正在编写某种带有缓存的中间件 HTTP 代理。工作流程是:
- 客户端请求此代理资源
- 如果缓存中存在资源,代理returns它
- 如果找不到资源,则代理获取远程资源并returns给用户。代理在数据加载时将此资源保存到缓存中。
我的接口有 Publisher<ByteBuffer>
远程资源流,接受 Publisher<ByteBuffer>
保存的缓存,以及接受 Publisher<ByteBuffer>
作为响应的客户端连接:
// remote resource
interface Resource {
Publisher<ByteBuffer> fetch();
}
// cache
interface Cache {
Completable save(Publisher<ByteBuffer> data);
}
// clien response connection
interface Connection {
Completable send(Publisher<ByteBuffer> data);
}
我的问题是,在向客户端发送响应时,我需要延迟将字节缓冲区流保存到缓存中,因此 客户端应该负责 请求 ByteByffer
来自远程资源的块,不缓存。
我尝试使用Publisher::cache
方法,但这对我来说不是一个好的选择,因为它将所有接收到的数据保存在内存中,这是不可接受的,因为缓存的数据可能只有几GB。
作为一种解决方法,我创建了 Subject
填充了从 Resource
收到的下一个项目:
private final Cache cache;
private final Connection out;
Completable proxy(Resource res) {
Subject<ByteBuffer> mirror = PublishSUbject.create();
return Completable.mergeArray(
out.send(res.fetch().doOnNext(mirror::onNext),
cache.save(mirror.toFlowable(BackpressureStrategy.BUFFER))
);
}
是否可以重用相同的 Publisher
而不在内存中缓存项目,并且只有一个订阅者负责从发布者请求项目?
我可能遗漏了一些东西(添加了关于我的 Publisher
界面版本不同的评论)。
但是..这就是我在概念上如何做这样的事情。
我打算简化接口来处理 Integers
:
// remote resource
interface Resource {
ConnectableObservable<Integer> fetch();
}
// cache
interface Cache {
Completable save(Integer data);
}
// client response connection
interface Connection {
Completable send(Integer data);
}
我会使用 Observable::publish
创建一个 ConnectableObservable
并建立两个订阅:
@Test
public void testProxy()
{
// Override schedulers:
TestScheduler s = new TestScheduler();
RxJavaPlugins.setIoSchedulerHandler(
scheduler -> s );
RxJavaPlugins.setComputationSchedulerHandler(
scheduler -> s );
// Mock interfaces:
Resource resource = () -> Observable.range( 1, 100 )
.publish();
Cache cache = data -> Completable.fromObservable( Observable.just( data )
.delay( 100, TimeUnit.MILLISECONDS )
.doOnNext( __ -> System.out.println( String.format( "Caching %d", data ))));
Connection connection = data -> Completable.fromObservable( Observable.just( data )
.delay( 500, TimeUnit.MILLISECONDS )
.doOnNext( __ -> System.out.println( String.format( "Sending %d", data ))));
// Subscribe to resource:
ConnectableObservable<Integer> observable = resource.fetch();
observable
.observeOn( Schedulers.io() )
.concatMapCompletable( data -> connection.send( data ))
.subscribe();
observable
.observeOn( Schedulers.computation() )
.concatMapCompletable( data -> cache.save( data ))
.subscribe();
observable.connect();
// Simulate passage of time:
s.advanceTimeBy( 10, TimeUnit.SECONDS );
}
输出:
Caching 1
Caching 2
Caching 3
Caching 4
Sending 1
Caching 5
Caching 6
Caching 7
Caching 8
Caching 9
Sending 2
Caching 10
. . .
更新
根据您的评论,听起来尊重背压对您的情况很重要。
假设您有一个支持背压的 Publisher
,您可以将其转换为 Flowable
,如下所示:
Flowable<T> flowable = Flowable.fromPublisher( publisher );
一旦您拥有 Flowable
,您就可以允许多个订阅者,而不必担心每个订阅者都必须从 Publisher
请求值(或者任一订阅者在建立订阅时错过任何事件)。您可以通过调用 flowable.publish()
来创建 ConnectableFlowable
.
ConnectableFlowable<T> flowable = Flowable.fromPublisher( publisher ).publish();
out.send(flowable); // calls flowable.subscribe()
cache.save(flowable); // calls flowable.subscribe()
flowable.connect(); // begins emitting values