如何收集一个 Observable<ResultSet> 到 Map
How to collect a Observable<ResultSet> to Map
我正在尝试从 Vertex AsyncSqlClient 收集 Observable<ResultSet>
到 HashMap。
Map<String, Integer> map = Maps.newHashMap();
asyncSQLClient
.getConnectionObservable()
.flatMap(sqlConnection -> sqlConnection.queryObservable("select a, b from table")
.doOnCompleted(sqlConnection::close)
.doOnError(throwable -> log.error("Error while querying.", throwable)))
.flatMap(resultSet -> Observable.from(resultSet.getRows()))
.toBlocking()
.forEach(row -> map.put(row.getString("a"), row.getInteger("b")));
但这似乎会永远阻塞。
找了半天没有结果,能不能帮帮我?
正如@Phoenix Wang 评论的那样,forEach()
方法will block until Observable will completes。这意味着如果你有 Observable
发出无限项,或者你有一个 Observable 不正确地不发出 onCompleted
来发出 Observable 完成信号,它将永远阻塞。
这可能是由您的 getConnectionObservable()
方法实现引起的,如果它创建了一个自定义 Observable
,例如使用 Observable.create()
,您必须在所有项目完成后调用 onCompleted()
发射。
无论如何,你应该知道 toBlocking()
会阻塞并等待,这可能不适合生产代码(因为它打破了反应性的所有目的),你可以使用 reduce()
:
asyncSQLClient
.getConnectionObservable()
.flatMap(sqlConnection -> sqlConnection.queryObservable("select a, b from table")
.doOnCompleted(sqlConnection::close)
.doOnError(throwable -> log.error("Error while querying.", throwable)))
.flatMap(resultSet -> Observable.from(resultSet.getRows()))
.reduce(Maps.newHashMap(), (map, o) -> map.put(row.getString("a"), row.getInteger("b")))
.subscribe(map -> {
//do something with map
}
);
请注意,您仍然需要解决 onCompleted
问题,因为 reduce 还需要 Observable
完成,并且会在源 Observable
完成时发出信号项。
其他选项是使用 scan()
(只需将 reduce 替换为 scan),通过扫描,您将获得源 Observable
发出的每个项目的排放,这意味着您将随着时间的推移积累要映射的项目。
我正在尝试从 Vertex AsyncSqlClient 收集 Observable<ResultSet>
到 HashMap。
Map<String, Integer> map = Maps.newHashMap();
asyncSQLClient
.getConnectionObservable()
.flatMap(sqlConnection -> sqlConnection.queryObservable("select a, b from table")
.doOnCompleted(sqlConnection::close)
.doOnError(throwable -> log.error("Error while querying.", throwable)))
.flatMap(resultSet -> Observable.from(resultSet.getRows()))
.toBlocking()
.forEach(row -> map.put(row.getString("a"), row.getInteger("b")));
但这似乎会永远阻塞。
找了半天没有结果,能不能帮帮我?
正如@Phoenix Wang 评论的那样,forEach()
方法will block until Observable will completes。这意味着如果你有 Observable
发出无限项,或者你有一个 Observable 不正确地不发出 onCompleted
来发出 Observable 完成信号,它将永远阻塞。
这可能是由您的 getConnectionObservable()
方法实现引起的,如果它创建了一个自定义 Observable
,例如使用 Observable.create()
,您必须在所有项目完成后调用 onCompleted()
发射。
无论如何,你应该知道 toBlocking()
会阻塞并等待,这可能不适合生产代码(因为它打破了反应性的所有目的),你可以使用 reduce()
:
asyncSQLClient
.getConnectionObservable()
.flatMap(sqlConnection -> sqlConnection.queryObservable("select a, b from table")
.doOnCompleted(sqlConnection::close)
.doOnError(throwable -> log.error("Error while querying.", throwable)))
.flatMap(resultSet -> Observable.from(resultSet.getRows()))
.reduce(Maps.newHashMap(), (map, o) -> map.put(row.getString("a"), row.getInteger("b")))
.subscribe(map -> {
//do something with map
}
);
请注意,您仍然需要解决 onCompleted
问题,因为 reduce 还需要 Observable
完成,并且会在源 Observable
完成时发出信号项。
其他选项是使用 scan()
(只需将 reduce 替换为 scan),通过扫描,您将获得源 Observable
发出的每个项目的排放,这意味着您将随着时间的推移积累要映射的项目。