数据未在 java rxObservable 的结果集中聚合

Data not getting aggregated in resultSet for java rxObservable

我正在尝试调用存储过程 3 次。当我运行下面的代码时,最后一次调用存储过程的数据只显示在resultSet.getRows()里面。前两次调用存储过程的数据没有出现在 resultSet.Following 是我的代码中。难道我做错了什么。谁能帮忙?

String currentPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String priorPeriod   = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String todayPeriod   = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
 JsonArray jsonArray = new JsonArray();

        database.dbObject().getConnectionObservable().subscribe(
                connection -> {
                    Observable<ResultSet> resultSetObservable = connection.callWithParamsObservable(currentPeriod, new JsonArray().add(params.getString("testParams")),jsonArray ).                                                                              
                            flatMap(result -> connection.callWithParamsObservable(priorPeriod, new JsonArray().add(params.getString("testParams")), jsonArray ).
                            flatMap(result -> connection.callWithParamsObservable(todayPeriod, new JsonArray().add(params.getString("testParams")),jsonArray );

                    resultSetObservable.subscribe(resultSet -> {
                        handler.handle(ReportUtils.parseSQLResult(resultSet.getRows()));
                    },error -> {
                        error.printStackTrace();
                    },connection::close);

                },err -> {
                    err.printStackTrace();
                }
        );

你想做的事情可以通过使用 combineLatest 运算符来实现,它给出所有可观察对象的结果(它将等待所有可观察对象给出结果)

参考 http://reactivex.io/documentation/operators/combinelatest.html

伪代码,

       String currentPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
      String priorPeriod   = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
      String todayPeriod   = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
     JsonArray jsonArray = new JsonArray();


    database.dbObject().getConnectionObservable().subscribe(
            connection -> {
                resultSetObservable = Observable.combineLatest(firstCall, secCall, thirdCall)
     firstCall = connection.callWithParamsObservable(currentPeriod, new JsonArray().add(params.getString("testParams")),jsonArray )

      secCall = result -> connection.callWithParamsObservable(priorPeriod, new JsonArray().add(params.getString("testParams")), jsonArray )

     thirdCall = result -> connection.callWithParamsObservable(todayPeriod, new JsonArray().add(params.getString("testParams")),jsonArray )
                resultSetObservable.subscribe(firstRes, secRes, thirdRes -> {
                    handler.handle(ReportUtils.parseSQLResult(resultSet.getRows()));
                },error -> {
                    error.printStackTrace();
                },connection::close);

            },err -> {
                err.printStackTrace();
            }
    );

@Bharath Mg。我修改了伪代码,它对我有用。

String currentPeriod = String.format("{call %s.test(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String priorPeriod   = String.format("{call %s.test(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String todayPeriod   = String.format("{call %s.test(?)}", params.getJsonObject("databaseInfo").getString("dbName"));


database.dbObject().getConnectionObservable().subscribe(
        connection -> {

            Observable<ResultSet> firstCall  = connection.queryWithParamsObservable(currentPeriod, new JsonArray().add(params.getString("testParams")));
            Observable<ResultSet> secondCall = connection.queryWithParamsObservable(priorPeriod, new JsonArray().add(params.getString("testParams")));
            Observable<ResultSet> thirdCall  = connection.queryWithParamsObservable(todayPeriod, new JsonArray().add(params.getString("testParams")));

            Observable.zip(firstCall, secondCall, thirdCall, new Func3<ResultSet, ResultSet, ResultSet, List<JsonObject>>() {
                @Override
                public List<JsonObject> call(ResultSet resultSet, ResultSet resultSet2, ResultSet resultSet3) {
                    List<JsonObject> allRecord = new ArrayList<JsonObject>();
                    allRecord.addAll(resultSet.getRows());
                    allRecord.addAll(resultSet2.getRows());
                    allRecord.addAll(resultSet3.getRows());
                    return allRecord;
                }
            }).subscribe(resultSet -> {
                handler.handle(resultSet);
            },error -> {
                error.printStackTrace();
            },connection::close);

        },err -> {
            err.printStackTrace();
        }
);