数据未在 java rxObservable 的结果集中聚合
Data not getting aggregated in resultSet for java rxObservable
我正在尝试调用存储过程 3 次。当我运行下面的代码时,最后一次调用存储过程的数据只显示在resultSet.getRows()
里面。前两次调用存储过程的数据没有出现在 resultSet.Following 是我的代码中。难道我做错了什么。谁能帮忙?
String currentPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String priorPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String todayPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
JsonArray jsonArray = new JsonArray();
database.dbObject().getConnectionObservable().subscribe(
connection -> {
Observable<ResultSet> resultSetObservable = connection.callWithParamsObservable(currentPeriod, new JsonArray().add(params.getString("testParams")),jsonArray ).
flatMap(result -> connection.callWithParamsObservable(priorPeriod, new JsonArray().add(params.getString("testParams")), jsonArray ).
flatMap(result -> connection.callWithParamsObservable(todayPeriod, new JsonArray().add(params.getString("testParams")),jsonArray );
resultSetObservable.subscribe(resultSet -> {
handler.handle(ReportUtils.parseSQLResult(resultSet.getRows()));
},error -> {
error.printStackTrace();
},connection::close);
},err -> {
err.printStackTrace();
}
);
你想做的事情可以通过使用 combineLatest 运算符来实现,它给出所有可观察对象的结果(它将等待所有可观察对象给出结果)
参考 http://reactivex.io/documentation/operators/combinelatest.html
伪代码,
String currentPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String priorPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String todayPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
JsonArray jsonArray = new JsonArray();
database.dbObject().getConnectionObservable().subscribe(
connection -> {
resultSetObservable = Observable.combineLatest(firstCall, secCall, thirdCall)
firstCall = connection.callWithParamsObservable(currentPeriod, new JsonArray().add(params.getString("testParams")),jsonArray )
secCall = result -> connection.callWithParamsObservable(priorPeriod, new JsonArray().add(params.getString("testParams")), jsonArray )
thirdCall = result -> connection.callWithParamsObservable(todayPeriod, new JsonArray().add(params.getString("testParams")),jsonArray )
resultSetObservable.subscribe(firstRes, secRes, thirdRes -> {
handler.handle(ReportUtils.parseSQLResult(resultSet.getRows()));
},error -> {
error.printStackTrace();
},connection::close);
},err -> {
err.printStackTrace();
}
);
@Bharath Mg。我修改了伪代码,它对我有用。
String currentPeriod = String.format("{call %s.test(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String priorPeriod = String.format("{call %s.test(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String todayPeriod = String.format("{call %s.test(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
database.dbObject().getConnectionObservable().subscribe(
connection -> {
Observable<ResultSet> firstCall = connection.queryWithParamsObservable(currentPeriod, new JsonArray().add(params.getString("testParams")));
Observable<ResultSet> secondCall = connection.queryWithParamsObservable(priorPeriod, new JsonArray().add(params.getString("testParams")));
Observable<ResultSet> thirdCall = connection.queryWithParamsObservable(todayPeriod, new JsonArray().add(params.getString("testParams")));
Observable.zip(firstCall, secondCall, thirdCall, new Func3<ResultSet, ResultSet, ResultSet, List<JsonObject>>() {
@Override
public List<JsonObject> call(ResultSet resultSet, ResultSet resultSet2, ResultSet resultSet3) {
List<JsonObject> allRecord = new ArrayList<JsonObject>();
allRecord.addAll(resultSet.getRows());
allRecord.addAll(resultSet2.getRows());
allRecord.addAll(resultSet3.getRows());
return allRecord;
}
}).subscribe(resultSet -> {
handler.handle(resultSet);
},error -> {
error.printStackTrace();
},connection::close);
},err -> {
err.printStackTrace();
}
);
我正在尝试调用存储过程 3 次。当我运行下面的代码时,最后一次调用存储过程的数据只显示在resultSet.getRows()
里面。前两次调用存储过程的数据没有出现在 resultSet.Following 是我的代码中。难道我做错了什么。谁能帮忙?
String currentPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String priorPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String todayPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
JsonArray jsonArray = new JsonArray();
database.dbObject().getConnectionObservable().subscribe(
connection -> {
Observable<ResultSet> resultSetObservable = connection.callWithParamsObservable(currentPeriod, new JsonArray().add(params.getString("testParams")),jsonArray ).
flatMap(result -> connection.callWithParamsObservable(priorPeriod, new JsonArray().add(params.getString("testParams")), jsonArray ).
flatMap(result -> connection.callWithParamsObservable(todayPeriod, new JsonArray().add(params.getString("testParams")),jsonArray );
resultSetObservable.subscribe(resultSet -> {
handler.handle(ReportUtils.parseSQLResult(resultSet.getRows()));
},error -> {
error.printStackTrace();
},connection::close);
},err -> {
err.printStackTrace();
}
);
你想做的事情可以通过使用 combineLatest 运算符来实现,它给出所有可观察对象的结果(它将等待所有可观察对象给出结果)
参考 http://reactivex.io/documentation/operators/combinelatest.html
伪代码,
String currentPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String priorPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String todayPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
JsonArray jsonArray = new JsonArray();
database.dbObject().getConnectionObservable().subscribe(
connection -> {
resultSetObservable = Observable.combineLatest(firstCall, secCall, thirdCall)
firstCall = connection.callWithParamsObservable(currentPeriod, new JsonArray().add(params.getString("testParams")),jsonArray )
secCall = result -> connection.callWithParamsObservable(priorPeriod, new JsonArray().add(params.getString("testParams")), jsonArray )
thirdCall = result -> connection.callWithParamsObservable(todayPeriod, new JsonArray().add(params.getString("testParams")),jsonArray )
resultSetObservable.subscribe(firstRes, secRes, thirdRes -> {
handler.handle(ReportUtils.parseSQLResult(resultSet.getRows()));
},error -> {
error.printStackTrace();
},connection::close);
},err -> {
err.printStackTrace();
}
);
@Bharath Mg。我修改了伪代码,它对我有用。
String currentPeriod = String.format("{call %s.test(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String priorPeriod = String.format("{call %s.test(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String todayPeriod = String.format("{call %s.test(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
database.dbObject().getConnectionObservable().subscribe(
connection -> {
Observable<ResultSet> firstCall = connection.queryWithParamsObservable(currentPeriod, new JsonArray().add(params.getString("testParams")));
Observable<ResultSet> secondCall = connection.queryWithParamsObservable(priorPeriod, new JsonArray().add(params.getString("testParams")));
Observable<ResultSet> thirdCall = connection.queryWithParamsObservable(todayPeriod, new JsonArray().add(params.getString("testParams")));
Observable.zip(firstCall, secondCall, thirdCall, new Func3<ResultSet, ResultSet, ResultSet, List<JsonObject>>() {
@Override
public List<JsonObject> call(ResultSet resultSet, ResultSet resultSet2, ResultSet resultSet3) {
List<JsonObject> allRecord = new ArrayList<JsonObject>();
allRecord.addAll(resultSet.getRows());
allRecord.addAll(resultSet2.getRows());
allRecord.addAll(resultSet3.getRows());
return allRecord;
}
}).subscribe(resultSet -> {
handler.handle(resultSet);
},error -> {
error.printStackTrace();
},connection::close);
},err -> {
err.printStackTrace();
}
);