Bikram Shrestha Bikram Shrestha - 2 months ago 4
Java Question

Data not getting aggregated in resultSet for java rxObservable

I am trying to call a stored procedure 3 times. When i ran below code, the data from the last call to the stored proc is only displayed inside

resultSet.getRows()
. The data from the previous two calls to the stored proc doesn't appears in the resultSet.Following is my code. Am i doing something wrong. Can anyone help?

String currentPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String priorPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String todayPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
JsonArray jsonArray = new JsonArray();

database.dbObject().getConnectionObservable().subscribe(
connection -> {
Observable<ResultSet> resultSetObservable = connection.callWithParamsObservable(currentPeriod, new JsonArray().add(params.getString("testParams")),jsonArray ).
flatMap(result -> connection.callWithParamsObservable(priorPeriod, new JsonArray().add(params.getString("testParams")), jsonArray ).
flatMap(result -> connection.callWithParamsObservable(todayPeriod, new JsonArray().add(params.getString("testParams")),jsonArray );

resultSetObservable.subscribe(resultSet -> {
handler.handle(ReportUtils.parseSQLResult(resultSet.getRows()));
},error -> {
error.printStackTrace();
},connection::close);

},err -> {
err.printStackTrace();
}
);

Answer

@Bharath Mg. I have modified the pseudo code and it is working for me.

String currentPeriod = String.format("{call %s.test(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String priorPeriod   = String.format("{call %s.test(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String todayPeriod   = String.format("{call %s.test(?)}", params.getJsonObject("databaseInfo").getString("dbName"));


database.dbObject().getConnectionObservable().subscribe(
        connection -> {

            Observable<ResultSet> firstCall  = connection.queryWithParamsObservable(currentPeriod, new JsonArray().add(params.getString("testParams")));
            Observable<ResultSet> secondCall = connection.queryWithParamsObservable(priorPeriod, new JsonArray().add(params.getString("testParams")));
            Observable<ResultSet> thirdCall  = connection.queryWithParamsObservable(todayPeriod, new JsonArray().add(params.getString("testParams")));

            Observable.zip(firstCall, secondCall, thirdCall, new Func3<ResultSet, ResultSet, ResultSet, List<JsonObject>>() {
                @Override
                public List<JsonObject> call(ResultSet resultSet, ResultSet resultSet2, ResultSet resultSet3) {
                    List<JsonObject> allRecord = new ArrayList<JsonObject>();
                    allRecord.addAll(resultSet.getRows());
                    allRecord.addAll(resultSet2.getRows());
                    allRecord.addAll(resultSet3.getRows());
                    return allRecord;
                }
            }).subscribe(resultSet -> {
                handler.handle(resultSet);
            },error -> {
                error.printStackTrace();
            },connection::close);

        },err -> {
            err.printStackTrace();
        }
);