RxJava 阀门用例
RxJava valve use case
RxJava 中是否有运算符、外部库或我缺少的方法来创建 flowable/observable 接收控制数据发射的函数,如阀门?
我有一个巨大的 json 文件需要处理,但我必须获取文件的一部分,实体列表,处理它然后获取另一部分,我尝试使用 windows(), buffer() 但是我传递给 Flowable.generate() 的 BiFunction 在我收到第一个列表并且我还没有完成处理之后继续执行。我还尝试了 hu.akarnokd.rxjava3.operators 中的 FlowableTransformers.valve() 但它只是在处理列表
的 flatMap() 函数之前堆积了项目
private Flowable<T> flowable(InputStream inputStream) {
return Flowable.generate(() -> jsonFactory.createParser(new GZIPInputStream(inputStream)), (jsonParser, emitter) -> {
final var token = jsonParser.nextToken();
if (token == null) {
emitter.onComplete();
}
if (JsonToken.START_ARRAY.equals(token) || JsonToken.END_ARRAY.equals(token)) {
return jsonParser;
}
if (JsonToken.START_OBJECT.equals(token)) {
emitter.onNext(reader.readValue(jsonParser));
}
return jsonParser;
}, JsonParser::close);
}
编辑:我需要控制项目的排放,以免内存和处理数据的函数过载,因为该函数读取和写入数据库,而且处理需要按顺序进行。处理数据的函数不完全是我的,它是用 RxJava 编写的,预计我会使用 Rx。
我设法这样解决了,但如果有其他方法请告诉我:
public static <T> Flowable<T> flowable(InputStream inputStream, JsonFactory jsonFactory, ObjectReader reader, Supplier<Boolean> booleanSupplier) {
return Flowable.generate(() -> jsonFactory.createParser(new GZIPInputStream(inputStream)), (jsonParser, emitter) -> {
if (booleanSupplier.get()) {
final var token = jsonParser.nextToken();
if (token == null) {
emitter.onComplete();
}
if (JsonToken.START_ARRAY.equals(token) || JsonToken.END_ARRAY.equals(token)) {
return jsonParser;
}
if (JsonToken.START_OBJECT.equals(token)) {
emitter.onNext(reader.readValue(jsonParser));
}
}
return jsonParser;
}, JsonParser::close);
}
Edit2:这是我目前使用函数的方式之一
public Flowable<List<T>> paging(Function<List<T>, Single<List<T>>> function) {
final var atomicInteger = new AtomicInteger(0);
final var atomicBoolean = new AtomicBoolean(true);
return flowable(inputStream, jsonFactory, reader, atomicBoolean::get)
.buffer(pageSize)
.flatMapSingle(list -> {
final var counter = atomicInteger.addAndGet(1);
if (counter == numberOfPages) {
atomicBoolean.set(false);
}
return function.apply(list)
.doFinally(() -> {
if (atomicInteger.get() == numberOfPages) {
atomicInteger.set(0);
atomicBoolean.set(true);
}
});
});
}
这样解决了
public static Flowable<Object> flowable(JsonParser jsonParser, ObjectReader reader, PublishProcessor<Boolean> valve) {
return Flowable.defer(() -> {
final var token = jsonParser.nextToken();
if (token == null) {
return Completable.fromAction(jsonParser::close)
.doOnError(Throwable::printStackTrace)
.onErrorComplete()
.andThen(Flowable.empty());
}
if (JsonToken.START_OBJECT.equals(token)) {
final var value = reader.readValue(jsonParser);
final var just = Flowable.just(value).compose(FlowableTransformers.valve(valve, true));
return Flowable.concat(just, flowable(jsonParser, reader, valve));
}
return flowable(jsonParser, reader, valve);
});
}
RxJava 中是否有运算符、外部库或我缺少的方法来创建 flowable/observable 接收控制数据发射的函数,如阀门?
我有一个巨大的 json 文件需要处理,但我必须获取文件的一部分,实体列表,处理它然后获取另一部分,我尝试使用 windows(), buffer() 但是我传递给 Flowable.generate() 的 BiFunction 在我收到第一个列表并且我还没有完成处理之后继续执行。我还尝试了 hu.akarnokd.rxjava3.operators 中的 FlowableTransformers.valve() 但它只是在处理列表
的 flatMap() 函数之前堆积了项目private Flowable<T> flowable(InputStream inputStream) {
return Flowable.generate(() -> jsonFactory.createParser(new GZIPInputStream(inputStream)), (jsonParser, emitter) -> {
final var token = jsonParser.nextToken();
if (token == null) {
emitter.onComplete();
}
if (JsonToken.START_ARRAY.equals(token) || JsonToken.END_ARRAY.equals(token)) {
return jsonParser;
}
if (JsonToken.START_OBJECT.equals(token)) {
emitter.onNext(reader.readValue(jsonParser));
}
return jsonParser;
}, JsonParser::close);
}
编辑:我需要控制项目的排放,以免内存和处理数据的函数过载,因为该函数读取和写入数据库,而且处理需要按顺序进行。处理数据的函数不完全是我的,它是用 RxJava 编写的,预计我会使用 Rx。
我设法这样解决了,但如果有其他方法请告诉我:
public static <T> Flowable<T> flowable(InputStream inputStream, JsonFactory jsonFactory, ObjectReader reader, Supplier<Boolean> booleanSupplier) {
return Flowable.generate(() -> jsonFactory.createParser(new GZIPInputStream(inputStream)), (jsonParser, emitter) -> {
if (booleanSupplier.get()) {
final var token = jsonParser.nextToken();
if (token == null) {
emitter.onComplete();
}
if (JsonToken.START_ARRAY.equals(token) || JsonToken.END_ARRAY.equals(token)) {
return jsonParser;
}
if (JsonToken.START_OBJECT.equals(token)) {
emitter.onNext(reader.readValue(jsonParser));
}
}
return jsonParser;
}, JsonParser::close);
}
Edit2:这是我目前使用函数的方式之一
public Flowable<List<T>> paging(Function<List<T>, Single<List<T>>> function) {
final var atomicInteger = new AtomicInteger(0);
final var atomicBoolean = new AtomicBoolean(true);
return flowable(inputStream, jsonFactory, reader, atomicBoolean::get)
.buffer(pageSize)
.flatMapSingle(list -> {
final var counter = atomicInteger.addAndGet(1);
if (counter == numberOfPages) {
atomicBoolean.set(false);
}
return function.apply(list)
.doFinally(() -> {
if (atomicInteger.get() == numberOfPages) {
atomicInteger.set(0);
atomicBoolean.set(true);
}
});
});
}
这样解决了
public static Flowable<Object> flowable(JsonParser jsonParser, ObjectReader reader, PublishProcessor<Boolean> valve) {
return Flowable.defer(() -> {
final var token = jsonParser.nextToken();
if (token == null) {
return Completable.fromAction(jsonParser::close)
.doOnError(Throwable::printStackTrace)
.onErrorComplete()
.andThen(Flowable.empty());
}
if (JsonToken.START_OBJECT.equals(token)) {
final var value = reader.readValue(jsonParser);
final var just = Flowable.just(value).compose(FlowableTransformers.valve(valve, true));
return Flowable.concat(just, flowable(jsonParser, reader, valve));
}
return flowable(jsonParser, reader, valve);
});
}