RxJava:如何刷新固定大小的缓冲区
RxJava: how to flush a fixed-size buffer
我正在使用 PublishProcessor<Log> logStream
以 Rx 方式实现日志记录逻辑。
生产:
logStream.onNext(Log)
消费:
logStream
.subscribeOn(Subscribers.io())
.buffer(1000, TimeUnit.MILLISECONDS, 300)
.subscribe(dumpLogs, errorHandler)
日志消息将以指定的固定持续时间 (1s) 和最大大小 (300) 缓存在 RxJava 缓冲区中。
现在我想添加一个新逻辑:在需要时立即转储日志。我用 Google 和 Whosebug 搜索了一段时间,但仍然不知道如何实现它。
目前我知道的最好的方法是合并一个subject
和区间(参考RxJava: how do you flush a timed buffer?),然后通过subject.onNext()
.
刷新缓冲区
但是,此方法不支持缓冲区大小限制。有没有更好的方法来实现这个?
我认为您可以修改您链接的另一个答案中的代码,如下所示,引入对 PublishSubject
的第二个订阅,在每个值之后递增和 AtomicInteger
:
PublishSubject<String> publishProcessor = PublishSubject.create();
Subject<Long> flush = PublishSubject.<Long>create().toSerialized();
AtomicInteger bufferSize = new AtomicInteger();
int maxBufferSize = 100;
publishProcessor
.buffer( flush.mergeWith( Observable.timer( 10, TimeUnit.MILLISECONDS ))
.take( 1 )
.repeat() )
.observeOn( Schedulers.io() )
.doOnNext( buffer -> bufferSize.set( 0 ))
.subscribe( /*...*/ );
publishProcessor.map( val -> bufferSize.incrementAndGet() )
.filter( size -> size == maxBufferSize )
.subscribe( size -> flush.onNext( 1L ));
// manual flush
flush.onNext( 1L );
我正在使用 PublishProcessor<Log> logStream
以 Rx 方式实现日志记录逻辑。
生产:
logStream.onNext(Log)
消费:
logStream
.subscribeOn(Subscribers.io())
.buffer(1000, TimeUnit.MILLISECONDS, 300)
.subscribe(dumpLogs, errorHandler)
日志消息将以指定的固定持续时间 (1s) 和最大大小 (300) 缓存在 RxJava 缓冲区中。
现在我想添加一个新逻辑:在需要时立即转储日志。我用 Google 和 Whosebug 搜索了一段时间,但仍然不知道如何实现它。
目前我知道的最好的方法是合并一个subject
和区间(参考RxJava: how do you flush a timed buffer?),然后通过subject.onNext()
.
但是,此方法不支持缓冲区大小限制。有没有更好的方法来实现这个?
我认为您可以修改您链接的另一个答案中的代码,如下所示,引入对 PublishSubject
的第二个订阅,在每个值之后递增和 AtomicInteger
:
PublishSubject<String> publishProcessor = PublishSubject.create();
Subject<Long> flush = PublishSubject.<Long>create().toSerialized();
AtomicInteger bufferSize = new AtomicInteger();
int maxBufferSize = 100;
publishProcessor
.buffer( flush.mergeWith( Observable.timer( 10, TimeUnit.MILLISECONDS ))
.take( 1 )
.repeat() )
.observeOn( Schedulers.io() )
.doOnNext( buffer -> bufferSize.set( 0 ))
.subscribe( /*...*/ );
publishProcessor.map( val -> bufferSize.incrementAndGet() )
.filter( size -> size == maxBufferSize )
.subscribe( size -> flush.onNext( 1L ));
// manual flush
flush.onNext( 1L );