使用 RxJava 将无限的分组事件流写入旋转文件
Using RxJava to write an infinite stream of grouped events to rotating files
我正在尝试实现以下行为:
- 定期有事件流polled/generated(持续时间短,比如 1 秒)
- 然后根据某些内部特征对事件进行分组。
- 每组事件都立即写入一个匹配文件(这是我要维护的行为的关键)
- 文件预计将在后续事件中重新用于匹配组(具有相同的密钥),直到它们 sealed/rotated
- 如果持续时间较长(比如 5 秒),文件会 sealed/rotated 并根据使用额外的订阅者采取行动
我编写了以下示例代码来实现上述行为:
private static final Integer EVENTS = 3;
private static final Long SHORTER = 1L;
private static final Long LONGER = 5L;
private static final Long SLEEP = 100000L;
public static void main(final String[] args) throws Exception {
val files = new DualHashBidiMap<Integer, File>();
Observable.just(EVENTS)
.flatMap(num -> Observable.fromIterable(ThreadLocalRandom.current().ints(num).boxed().collect(Collectors.toList())))
.groupBy(num -> Math.abs(num % 2))
.repeatWhen(completed -> completed.delay(SHORTER, TimeUnit.SECONDS))
.map(group -> {
val file = files.computeIfAbsent(group.getKey(), Unchecked.function(key -> File.createTempFile(String.format("%03d-", key), ".txt")));
group.map(Object::toString).toList().subscribe(lines -> FileUtils.writeLines(file, StandardCharsets.UTF_8.name(), lines, true));
return file;
})
.buffer(LONGER, TimeUnit.SECONDS)
.flatMap(Observable::fromIterable)
.distinct(File::getName)
.doOnNext(files::removeValue)
.doOnNext(file -> System.out.println("File - '" + file + "', Lines - " + FileUtils.readLines(file, StandardCharsets.UTF_8)))
.subscribe();
Thread.sleep(SLEEP);
}
虽然它按预期工作(暂时搁置地图访问的线程安全问题,我使用 commons-collections4
的 bidi-map 只是为了演示),我想知道是否有以 纯 RX 形式实现上述功能的方法,而不依赖于外部地图访问?
请注意,关键 在组创建后立即写入文件,这意味着我们必须使文件在生成的事件组范围之外存活
提前致谢。
有趣的问题..我可能是错的,但我认为你无法避免在管道中的某个地方出现 Map
或 Files
。
我认为我的解决方案可以进一步清理,但它似乎完成了以下工作:
- 不再需要双向映射
- 避免调用
Map.remove(...)
我建议您将 Files
的 Map
视为不同的 Observable
,以较慢的间隔发出全新的 Map
:
Observable<HashMap<Integer, File>> fileObservable = Observable.fromCallable(
() -> new HashMap<Integer, File>() )
.repeatWhen( completed -> completed.delay( LONGER, TimeUnit.SECONDS ));
然后在你的事件Observable
中,你可以调用.withLatestFrom( fileObservable, ( group, files ) -> {...} )
(注意:这个块仍然不完整):
Observable.just( EVENTS )
.flatMap( num -> Observable.fromIterable(
ThreadLocalRandom.current().ints( num ).boxed().collect( Collectors.toList() )))
.groupBy( num -> Math.abs( num % 2 ))
.repeatWhen( completed -> completed.delay( SHORTER, TimeUnit.SECONDS ))
.withLatestFrom( fileObservable, ( group, files ) -> {
File file = files.computeIfAbsent(
group.getKey(),
Unchecked.function( key -> File.createTempFile( String.format( "%03d-", key ), ".txt" )));
group.map( Object::toString ).toList()
.subscribe( lines -> FileUtils.writeLines(file, StandardCharsets.UTF_8.name(), lines, true ));
return files;
} )
到目前为止一切顺利,您将获得与您的活动一起提供的最新一组 Files
。接下来您必须处理 Files
。我认为您可以使用 distinctUntilChanged()
来做到这一点。它应该非常有效,因为它会在幕后调用 HashMap.equals(...)
并且 Map
对象的标识大部分时间都不会改变。 HashMap.equals(...)
首先检查相同的身份。
由于此时您真正感兴趣的是处理 previous 组发出的 Files
而不是当前的,您可以使用 .scan(( prev, current ) -> {...} )
操作员。这样,这是上面完成的代码块:
Observable.just( EVENTS )
.flatMap( num -> Observable.fromIterable(
ThreadLocalRandom.current().ints( num ).boxed().collect( Collectors.toList() )))
.groupBy( num -> Math.abs( num % 2 ))
.repeatWhen( completed -> completed.delay( SHORTER, TimeUnit.SECONDS ))
.withLatestFrom( fileObservable, ( group, files ) -> {
File file = files.computeIfAbsent(
group.getKey(),
Unchecked.function( key -> File.createTempFile( String.format( "%03d-", key ), ".txt" )));
group.map( Object::toString ).toList()
.subscribe( lines -> FileUtils.writeLines(file, StandardCharsets.UTF_8.name(), lines, true ));
return files;
} )
.distinctUntilChanged()
.scan(( prev, current ) -> {
Observable.fromIterable( prev.entrySet() )
.map( Entry::getValue )
.subscribe( file -> System.out.println( "File - '" + file + "', Lines - " +
FileUtils.readLines( file, StandardCharsets.UTF_8 )));
return current;
} )
.subscribe();
Thread.sleep( SLEEP );
比您原来的解决方案稍长,但可能会解决几个问题。
我正在尝试实现以下行为:
- 定期有事件流polled/generated(持续时间短,比如 1 秒)
- 然后根据某些内部特征对事件进行分组。
- 每组事件都立即写入一个匹配文件(这是我要维护的行为的关键)
- 文件预计将在后续事件中重新用于匹配组(具有相同的密钥),直到它们 sealed/rotated
- 如果持续时间较长(比如 5 秒),文件会 sealed/rotated 并根据使用额外的订阅者采取行动
我编写了以下示例代码来实现上述行为:
private static final Integer EVENTS = 3;
private static final Long SHORTER = 1L;
private static final Long LONGER = 5L;
private static final Long SLEEP = 100000L;
public static void main(final String[] args) throws Exception {
val files = new DualHashBidiMap<Integer, File>();
Observable.just(EVENTS)
.flatMap(num -> Observable.fromIterable(ThreadLocalRandom.current().ints(num).boxed().collect(Collectors.toList())))
.groupBy(num -> Math.abs(num % 2))
.repeatWhen(completed -> completed.delay(SHORTER, TimeUnit.SECONDS))
.map(group -> {
val file = files.computeIfAbsent(group.getKey(), Unchecked.function(key -> File.createTempFile(String.format("%03d-", key), ".txt")));
group.map(Object::toString).toList().subscribe(lines -> FileUtils.writeLines(file, StandardCharsets.UTF_8.name(), lines, true));
return file;
})
.buffer(LONGER, TimeUnit.SECONDS)
.flatMap(Observable::fromIterable)
.distinct(File::getName)
.doOnNext(files::removeValue)
.doOnNext(file -> System.out.println("File - '" + file + "', Lines - " + FileUtils.readLines(file, StandardCharsets.UTF_8)))
.subscribe();
Thread.sleep(SLEEP);
}
虽然它按预期工作(暂时搁置地图访问的线程安全问题,我使用 commons-collections4
的 bidi-map 只是为了演示),我想知道是否有以 纯 RX 形式实现上述功能的方法,而不依赖于外部地图访问?
请注意,关键 在组创建后立即写入文件,这意味着我们必须使文件在生成的事件组范围之外存活
提前致谢。
有趣的问题..我可能是错的,但我认为你无法避免在管道中的某个地方出现 Map
或 Files
。
我认为我的解决方案可以进一步清理,但它似乎完成了以下工作:
- 不再需要双向映射
- 避免调用
Map.remove(...)
我建议您将 Files
的 Map
视为不同的 Observable
,以较慢的间隔发出全新的 Map
:
Observable<HashMap<Integer, File>> fileObservable = Observable.fromCallable(
() -> new HashMap<Integer, File>() )
.repeatWhen( completed -> completed.delay( LONGER, TimeUnit.SECONDS ));
然后在你的事件Observable
中,你可以调用.withLatestFrom( fileObservable, ( group, files ) -> {...} )
(注意:这个块仍然不完整):
Observable.just( EVENTS )
.flatMap( num -> Observable.fromIterable(
ThreadLocalRandom.current().ints( num ).boxed().collect( Collectors.toList() )))
.groupBy( num -> Math.abs( num % 2 ))
.repeatWhen( completed -> completed.delay( SHORTER, TimeUnit.SECONDS ))
.withLatestFrom( fileObservable, ( group, files ) -> {
File file = files.computeIfAbsent(
group.getKey(),
Unchecked.function( key -> File.createTempFile( String.format( "%03d-", key ), ".txt" )));
group.map( Object::toString ).toList()
.subscribe( lines -> FileUtils.writeLines(file, StandardCharsets.UTF_8.name(), lines, true ));
return files;
} )
到目前为止一切顺利,您将获得与您的活动一起提供的最新一组 Files
。接下来您必须处理 Files
。我认为您可以使用 distinctUntilChanged()
来做到这一点。它应该非常有效,因为它会在幕后调用 HashMap.equals(...)
并且 Map
对象的标识大部分时间都不会改变。 HashMap.equals(...)
首先检查相同的身份。
由于此时您真正感兴趣的是处理 previous 组发出的 Files
而不是当前的,您可以使用 .scan(( prev, current ) -> {...} )
操作员。这样,这是上面完成的代码块:
Observable.just( EVENTS )
.flatMap( num -> Observable.fromIterable(
ThreadLocalRandom.current().ints( num ).boxed().collect( Collectors.toList() )))
.groupBy( num -> Math.abs( num % 2 ))
.repeatWhen( completed -> completed.delay( SHORTER, TimeUnit.SECONDS ))
.withLatestFrom( fileObservable, ( group, files ) -> {
File file = files.computeIfAbsent(
group.getKey(),
Unchecked.function( key -> File.createTempFile( String.format( "%03d-", key ), ".txt" )));
group.map( Object::toString ).toList()
.subscribe( lines -> FileUtils.writeLines(file, StandardCharsets.UTF_8.name(), lines, true ));
return files;
} )
.distinctUntilChanged()
.scan(( prev, current ) -> {
Observable.fromIterable( prev.entrySet() )
.map( Entry::getValue )
.subscribe( file -> System.out.println( "File - '" + file + "', Lines - " +
FileUtils.readLines( file, StandardCharsets.UTF_8 )));
return current;
} )
.subscribe();
Thread.sleep( SLEEP );
比您原来的解决方案稍长,但可能会解决几个问题。