RxJava 代码执行流程 - map 与 flatMap

RxJava Code Execution Flow - map vs. flatMap

在使用 JerseyRx 和 RxMongo 驱动程序编写 Web 爬虫时,我很难尝试学习 RxJava 和 Reactive 编程。我的代码如下所示:

    Observable.interval( 200, TimeUnit.MILLISECONDS ).map(tick -> links(linksCollection)
            .map(linkDoc -> httpGet(client, linkDoc.getString("url"))
                    .map(htmlDoc -> parseLinks(htmlDoc)).subscribe() )
            .subscribe())
    .subscribe();

links()和httpGet()的签名如下:

Observable<Document> links(MongoCollection<Document> linkCollection)
Observable<String> httpGet(RxClient<RxObservableInvoker> client, String url)
List<HtmlLink> parseLinks(final String html)

假设方法在调用时打印它们的名称,则输出如下所示:

友情链接 得到 链接 解析 链接 链接 链接 友情链接

Get 和 Parse 只被调用一次。 谁能解释一下为什么程序流程是这样的,以及如何解决这个问题。

为了简洁起见,未在该问题中声明的对象被省略

当你想触发异步sub-process(比如为link获取http文档)时,你应该使用flatMap。它将展平 sub-Observable 并将其项目作为输出 Observable 中的另一个顺序 "burst" 发射:

Observable.interval(200, TimeUnit.MILLISECONDS)
    //this is a continuous flow of n documents repeated every 200ms:
    .flatMap(tick -> links(linksCollection))
    //this is a continous Observable<String> with n Strings every 200ms
    .flatMap(linkDoc -> httpGet(client, linkDoc.getString("url"))
    //here you have a flow of httpDocuments, get the list of links for each one
    .map(htmlDoc -> parseLinks(htmlDoc);

将此链影响到变量会产生一个您可以订阅的 Observable<List<HtmlLink>>