如何收集一个 Observable<ResultSet> 到 Map

How to collect a Observable<ResultSet> to Map

我正在尝试从 Vertex AsyncSqlClient 收集 Observable<ResultSet> 到 HashMap。

Map<String, Integer> map = Maps.newHashMap();
asyncSQLClient
        .getConnectionObservable()
        .flatMap(sqlConnection -> sqlConnection.queryObservable("select a, b from table")
                .doOnCompleted(sqlConnection::close)
                .doOnError(throwable -> log.error("Error while querying.", throwable)))
        .flatMap(resultSet -> Observable.from(resultSet.getRows()))
        .toBlocking()
        .forEach(row -> map.put(row.getString("a"), row.getInteger("b")));

但这似乎会永远阻塞。

找了半天没有结果,能不能帮帮我?

正如@Phoenix Wang 评论的那样,forEach() 方法will block until Observable will completes。这意味着如果你有 Observable 发出无限项,或者你有一个 Observable 不正确地不发出 onCompleted 来发出 Observable 完成信号,它将永远阻塞。

这可能是由您的 getConnectionObservable() 方法实现引起的,如果它创建了一个自定义 Observable,例如使用 Observable.create(),您必须在所有项目完成后调用 onCompleted()发射。

无论如何,你应该知道 toBlocking() 会阻塞并等待,这可能不适合生产代码(因为它打破了反应性的所有目的),你可以使用 reduce():

 asyncSQLClient
            .getConnectionObservable()
            .flatMap(sqlConnection -> sqlConnection.queryObservable("select a, b from table")
                    .doOnCompleted(sqlConnection::close)
                    .doOnError(throwable -> log.error("Error while querying.", throwable)))
            .flatMap(resultSet -> Observable.from(resultSet.getRows()))
            .reduce(Maps.newHashMap(), (map, o) -> map.put(row.getString("a"), row.getInteger("b")))
            .subscribe(map -> {
                        //do something with map
                    }
            );

请注意,您仍然需要解决 onCompleted 问题,因为 reduce 还需要 Observable 完成,并且会在源 Observable 完成时发出信号项。
其他选项是使用 scan()(只需将 reduce 替换为 scan),通过扫描,您将获得源 Observable 发出的每个项目的排放,这意味着您将随着时间的推移积累要映射的项目。