使用 RxJava 将无限的分组事件流写入旋转文件

Using RxJava to write an infinite stream of grouped events to rotating files

我正在尝试实现以下行为:

我编写了以下示例代码来实现上述行为:


    private static final Integer EVENTS = 3;
    private static final Long SHORTER = 1L;
    private static final Long LONGER = 5L;
    private static final Long SLEEP = 100000L;

    public static void main(final String[] args) throws Exception {

        val files = new DualHashBidiMap<Integer, File>();

        Observable.just(EVENTS)
                .flatMap(num -> Observable.fromIterable(ThreadLocalRandom.current().ints(num).boxed().collect(Collectors.toList())))
                .groupBy(num -> Math.abs(num % 2))
                .repeatWhen(completed -> completed.delay(SHORTER, TimeUnit.SECONDS))
                .map(group -> {
                    val file = files.computeIfAbsent(group.getKey(), Unchecked.function(key -> File.createTempFile(String.format("%03d-", key), ".txt")));
                    group.map(Object::toString).toList().subscribe(lines -> FileUtils.writeLines(file, StandardCharsets.UTF_8.name(), lines, true));
                    return file;
                })
                .buffer(LONGER, TimeUnit.SECONDS)
                .flatMap(Observable::fromIterable)
                .distinct(File::getName)
                .doOnNext(files::removeValue)
                .doOnNext(file -> System.out.println("File - '" + file + "', Lines - " + FileUtils.readLines(file, StandardCharsets.UTF_8)))
                .subscribe();
        Thread.sleep(SLEEP);
    }

虽然它按预期工作(暂时搁置地图访问的线程安全问题,我使用 commons-collections4 的 bidi-map 只是为了演示),我想知道是否有以 RX 形式实现上述功能的方法,而不依赖于外部地图访问?

请注意,关键 在组创建后立即写入文件,这意味着我们必须使文件在生成的事件组范围之外存活

提前致谢。

有趣的问题..我可能是错的,但我认为你无法避免在管道中的某个地方出现 MapFiles

我认为我的解决方案可以进一步清理,但它似乎完成了以下工作:

  • 不再需要双向映射
  • 避免调用 Map.remove(...)

我建议您将 FilesMap 视为不同的 Observable,以较慢的间隔发出全新的 Map

    Observable<HashMap<Integer, File>> fileObservable = Observable.fromCallable(
                () -> new HashMap<Integer, File>() )
            .repeatWhen( completed -> completed.delay( LONGER, TimeUnit.SECONDS ));

然后在你的事件Observable中,你可以调用.withLatestFrom( fileObservable, ( group, files ) -> {...} )注意:这个块仍然不完整):

    Observable.just( EVENTS )
        .flatMap( num -> Observable.fromIterable(
                ThreadLocalRandom.current().ints( num ).boxed().collect( Collectors.toList() )))
        .groupBy( num -> Math.abs( num % 2 ))
        .repeatWhen( completed -> completed.delay( SHORTER, TimeUnit.SECONDS ))
        .withLatestFrom( fileObservable, ( group, files ) -> {

            File file = files.computeIfAbsent(
                    group.getKey(),
                    Unchecked.function( key -> File.createTempFile( String.format( "%03d-", key ), ".txt" )));

            group.map( Object::toString ).toList()
                .subscribe( lines -> FileUtils.writeLines(file, StandardCharsets.UTF_8.name(), lines, true ));

            return files;
        } )

到目前为止一切顺利,您将获得与您的活动一起提供的最新一组 Files。接下来您必须处理 Files。我认为您可以使用 distinctUntilChanged() 来做到这一点。它应该非常有效,因为它会在幕后调用 HashMap.equals(...) 并且 Map 对象的标识大部分时间都不会改变。 HashMap.equals(...) 首先检查相同的身份。

由于此时您真正感兴趣的是处理 previous 组发出的 Files 而不是当前的,您可以使用 .scan(( prev, current ) -> {...} )操作员。这样,这是上面完成的代码块:

    Observable.just( EVENTS )
        .flatMap( num -> Observable.fromIterable(
                ThreadLocalRandom.current().ints( num ).boxed().collect( Collectors.toList() )))
        .groupBy( num -> Math.abs( num % 2 ))
        .repeatWhen( completed -> completed.delay( SHORTER, TimeUnit.SECONDS ))
        .withLatestFrom( fileObservable, ( group, files ) -> {

            File file = files.computeIfAbsent(
                    group.getKey(),
                    Unchecked.function( key -> File.createTempFile( String.format( "%03d-", key ), ".txt" )));

            group.map( Object::toString ).toList()
                .subscribe( lines -> FileUtils.writeLines(file, StandardCharsets.UTF_8.name(), lines, true ));

            return files;
        } )
        .distinctUntilChanged()
        .scan(( prev, current ) -> {

            Observable.fromIterable( prev.entrySet() )
                .map( Entry::getValue )
                .subscribe( file -> System.out.println( "File - '" + file + "', Lines - " +
                                FileUtils.readLines( file, StandardCharsets.UTF_8 )));

            return current;
        } )
        .subscribe();

    Thread.sleep( SLEEP );

比您原来的解决方案稍长,但可能会解决几个问题。