GroupBy 紧随键 RxJava

GroupBy immediately following keys RxJava

我有一个简单的问题,我从 SQL 数据库中获取了一堆行。我需要折叠这些行,在多个 json 对象中减少它们。使用 rxJava,我有:

Promise<Flowable<Data>> p = Promise.promise();
p.complete(observable
             .groupBy(array -> array.getValue(0))
             .flatMap(g -> extractData(g));

问题是,当数据超过一定数量时,此代码会挂起。我觉得
我理解为什么,因为 flatMap 的并发上限,并且因为我的组永远不会结束主题并等待主要观察对象结束后再返回(reduce 需要组中的所有值才能工作)。问题是,我处理的行保证按特定键排序,与我分组的行相同。

我正在寻找一种在创建新组时关闭先前组的方法。有办法实现吗?

我认为 groupByUntil 会允许这样做,但它现在似乎已合并到 groupBy 方法中(至少在 RxJava 中),我无法找到使用 [=15 的方法=]/takeWhile

编辑:

只是意识到没有extractData的内容很难理解,这里是:

return group.reduce(new Data(), <business logic>).toFlowable()

看来我设法使用自己的运算符和 lift 方法做到了。它真的远非防弹,但它现在正在工作......很难遵循所有指南 the guide 我希望我没有搞乱背压或其他东西:

private static final class ConvertSQLRowsToSchemaInstances implements FlowableOperator<SchemaInstance, JsonArray> {
  private final SQLRowStream rows;
  private final Loader loader;
  private final Table table;
  private final List<Map.Entry<String, TableField>> wanted;
  private final List<Map.Entry<String, TableField>> unwanted;
  private final TableField unicity;

  public ConvertSQLRowsToSchemaInstances(SQLRowStream rows, Loader loader, Table table,
                                         List<Map.Entry<String, TableField>> wanted,
                                         List<Map.Entry<String, TableField>> unwanted) {
    this.rows = rows;
    this.loader = loader;
    this.table = table;
    this.wanted = wanted;
    this.unwanted = unwanted;
    this.unicity = table.getUnicityFields().get(0);
  }

  @NonNull
  @Override
  public Subscriber<? super JsonArray> apply(@NonNull Subscriber<? super SchemaInstance> subscriber) throws Exception {
    return new Op(subscriber);
  }

  private final class Op implements FlowableSubscriber<JsonArray>, Subscription {
    final Subscriber<? super SchemaInstance> child;

    SchemaInstance si = null;
    Subscription s;

    public Op(Subscriber<? super SchemaInstance> child) {
      this.child = child;
    }

    @Override
    public void onSubscribe(Subscription s) {
      this.s = s;
      child.onSubscribe(this);
    }

    @Override
    public void onNext(JsonArray array) {
      try {
        if (si == null) si = loader.getEmptyInstance(table.getSchema());
        else if (!si.get(unicity.getName()).get().equals(array.getValue(rows.column(unicity.getName())))) {
          // New schema arrived
          child.onNext(si);
          si = loader.getEmptyInstance(table.getSchema());
        }
        extractData(si, array, rows, table, loader, wanted, unwanted);
        request(1);
      } catch (UnknownTypeException | IllegalAccessException | InstantiationException | NoSuchFieldException e) {
        onError(e);
      }
    }

    @Override
    public void onError(Throwable e) {
      child.onError(e);
    }

    @Override
    public void onComplete() {
      if (si != null) child.onNext(si);
      child.onComplete();
    }

    @Override
    public void cancel() {
      s.cancel();
    }

    @Override
    public void request(long n) {
      s.request(n);
    }
  }
}

只是试图将一些行从 SQL 累积到 class 的单个实例。欢迎任何建议。