RxJava:如何刷新固定大小的缓冲区

RxJava: how to flush a fixed-size buffer

我正在使用 PublishProcessor<Log> logStream 以 Rx 方式实现日志记录逻辑。

生产:

logStream.onNext(Log)

消费:

logStream
    .subscribeOn(Subscribers.io())
    .buffer(1000, TimeUnit.MILLISECONDS, 300)
    .subscribe(dumpLogs, errorHandler)

日志消息将以指定的固定持续时间 (1s) 和最大大小 (300) 缓存在 RxJava 缓冲区中。

现在我想添加一个新逻辑:在需要时立即转储日志。我用 Google 和 Whosebug 搜索了一段时间,但仍然不知道如何实现它。

目前我知道的最好的方法是合并一个subject和区间(参考RxJava: how do you flush a timed buffer?),然后通过subject.onNext().

刷新缓冲区

但是,此方法不支持缓冲区大小限制。有没有更好的方法来实现这个?

我认为您可以修改您链接的另一个答案中的代码,如下所示,引入对 PublishSubject 的第二个订阅,在每个值之后递增和 AtomicInteger

    PublishSubject<String> publishProcessor = PublishSubject.create();
    Subject<Long> flush = PublishSubject.<Long>create().toSerialized();
    AtomicInteger bufferSize = new AtomicInteger();
    int maxBufferSize = 100;
    
    publishProcessor
        .buffer( flush.mergeWith( Observable.timer( 10, TimeUnit.MILLISECONDS ))
            .take( 1 )
            .repeat() )
        .observeOn( Schedulers.io() )
        .doOnNext( buffer -> bufferSize.set( 0 ))
        .subscribe( /*...*/ );

    publishProcessor.map( val -> bufferSize.incrementAndGet() )
        .filter( size -> size == maxBufferSize )
        .subscribe( size -> flush.onNext( 1L ));
    
    // manual flush
    flush.onNext( 1L );