GroupBy 紧随键 RxJava
GroupBy immediately following keys RxJava
我有一个简单的问题,我从 SQL 数据库中获取了一堆行。我需要折叠这些行,在多个 json 对象中减少它们。使用 rxJava,我有:
Promise<Flowable<Data>> p = Promise.promise();
p.complete(observable
.groupBy(array -> array.getValue(0))
.flatMap(g -> extractData(g));
问题是,当数据超过一定数量时,此代码会挂起。我觉得
我理解为什么,因为 flatMap
的并发上限,并且因为我的组永远不会结束主题并等待主要观察对象结束后再返回(reduce 需要组中的所有值才能工作)。问题是,我处理的行保证按特定键排序,与我分组的行相同。
我正在寻找一种在创建新组时关闭先前组的方法。有办法实现吗?
我认为 groupByUntil
会允许这样做,但它现在似乎已合并到 groupBy
方法中(至少在 RxJava 中),我无法找到使用 [=15 的方法=]/takeWhile
编辑:
只是意识到没有extractData
的内容很难理解,这里是:
return group.reduce(new Data(), <business logic>).toFlowable()
看来我设法使用自己的运算符和 lift
方法做到了。它真的远非防弹,但它现在正在工作......很难遵循所有指南 the guide 我希望我没有搞乱背压或其他东西:
private static final class ConvertSQLRowsToSchemaInstances implements FlowableOperator<SchemaInstance, JsonArray> {
private final SQLRowStream rows;
private final Loader loader;
private final Table table;
private final List<Map.Entry<String, TableField>> wanted;
private final List<Map.Entry<String, TableField>> unwanted;
private final TableField unicity;
public ConvertSQLRowsToSchemaInstances(SQLRowStream rows, Loader loader, Table table,
List<Map.Entry<String, TableField>> wanted,
List<Map.Entry<String, TableField>> unwanted) {
this.rows = rows;
this.loader = loader;
this.table = table;
this.wanted = wanted;
this.unwanted = unwanted;
this.unicity = table.getUnicityFields().get(0);
}
@NonNull
@Override
public Subscriber<? super JsonArray> apply(@NonNull Subscriber<? super SchemaInstance> subscriber) throws Exception {
return new Op(subscriber);
}
private final class Op implements FlowableSubscriber<JsonArray>, Subscription {
final Subscriber<? super SchemaInstance> child;
SchemaInstance si = null;
Subscription s;
public Op(Subscriber<? super SchemaInstance> child) {
this.child = child;
}
@Override
public void onSubscribe(Subscription s) {
this.s = s;
child.onSubscribe(this);
}
@Override
public void onNext(JsonArray array) {
try {
if (si == null) si = loader.getEmptyInstance(table.getSchema());
else if (!si.get(unicity.getName()).get().equals(array.getValue(rows.column(unicity.getName())))) {
// New schema arrived
child.onNext(si);
si = loader.getEmptyInstance(table.getSchema());
}
extractData(si, array, rows, table, loader, wanted, unwanted);
request(1);
} catch (UnknownTypeException | IllegalAccessException | InstantiationException | NoSuchFieldException e) {
onError(e);
}
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onComplete() {
if (si != null) child.onNext(si);
child.onComplete();
}
@Override
public void cancel() {
s.cancel();
}
@Override
public void request(long n) {
s.request(n);
}
}
}
只是试图将一些行从 SQL 累积到 class 的单个实例。欢迎任何建议。
我有一个简单的问题,我从 SQL 数据库中获取了一堆行。我需要折叠这些行,在多个 json 对象中减少它们。使用 rxJava,我有:
Promise<Flowable<Data>> p = Promise.promise();
p.complete(observable
.groupBy(array -> array.getValue(0))
.flatMap(g -> extractData(g));
问题是,当数据超过一定数量时,此代码会挂起。我觉得
我理解为什么,因为 flatMap
的并发上限,并且因为我的组永远不会结束主题并等待主要观察对象结束后再返回(reduce 需要组中的所有值才能工作)。问题是,我处理的行保证按特定键排序,与我分组的行相同。
我正在寻找一种在创建新组时关闭先前组的方法。有办法实现吗?
我认为 groupByUntil
会允许这样做,但它现在似乎已合并到 groupBy
方法中(至少在 RxJava 中),我无法找到使用 [=15 的方法=]/takeWhile
编辑:
只是意识到没有extractData
的内容很难理解,这里是:
return group.reduce(new Data(), <business logic>).toFlowable()
看来我设法使用自己的运算符和 lift
方法做到了。它真的远非防弹,但它现在正在工作......很难遵循所有指南 the guide 我希望我没有搞乱背压或其他东西:
private static final class ConvertSQLRowsToSchemaInstances implements FlowableOperator<SchemaInstance, JsonArray> {
private final SQLRowStream rows;
private final Loader loader;
private final Table table;
private final List<Map.Entry<String, TableField>> wanted;
private final List<Map.Entry<String, TableField>> unwanted;
private final TableField unicity;
public ConvertSQLRowsToSchemaInstances(SQLRowStream rows, Loader loader, Table table,
List<Map.Entry<String, TableField>> wanted,
List<Map.Entry<String, TableField>> unwanted) {
this.rows = rows;
this.loader = loader;
this.table = table;
this.wanted = wanted;
this.unwanted = unwanted;
this.unicity = table.getUnicityFields().get(0);
}
@NonNull
@Override
public Subscriber<? super JsonArray> apply(@NonNull Subscriber<? super SchemaInstance> subscriber) throws Exception {
return new Op(subscriber);
}
private final class Op implements FlowableSubscriber<JsonArray>, Subscription {
final Subscriber<? super SchemaInstance> child;
SchemaInstance si = null;
Subscription s;
public Op(Subscriber<? super SchemaInstance> child) {
this.child = child;
}
@Override
public void onSubscribe(Subscription s) {
this.s = s;
child.onSubscribe(this);
}
@Override
public void onNext(JsonArray array) {
try {
if (si == null) si = loader.getEmptyInstance(table.getSchema());
else if (!si.get(unicity.getName()).get().equals(array.getValue(rows.column(unicity.getName())))) {
// New schema arrived
child.onNext(si);
si = loader.getEmptyInstance(table.getSchema());
}
extractData(si, array, rows, table, loader, wanted, unwanted);
request(1);
} catch (UnknownTypeException | IllegalAccessException | InstantiationException | NoSuchFieldException e) {
onError(e);
}
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onComplete() {
if (si != null) child.onNext(si);
child.onComplete();
}
@Override
public void cancel() {
s.cancel();
}
@Override
public void request(long n) {
s.request(n);
}
}
}
只是试图将一些行从 SQL 累积到 class 的单个实例。欢迎任何建议。