任何 returns 输入变量和结果的 Rx 运算符?

Any Rx operator that returns both the input variable and the result?

当使用 Rx(特别是 RxJava)时,是否有一个运算符将输入变量与函数的输出一起打包,以便在下一步中同时使用两者?

例如,假设我从推文 ID 列表开始,我的程序 a) 执行 REST 调用以获取该推文的文本,然后 b) 将 ID 和文本保存到本地数据库中。正常的 "map" 将 return 步骤 a 的文本,但它会丢弃原始 ID。在代码中:

Observable.from(tweet_id_list)
    .specialMap(i -> getTweetText(i))  // is there a "specialMap" which returns the string result AND ALSO the id?
    .map((i, s) -> saveToLocalDB(i, s)  // because we need both for the ensuing function

我将 getTweetText(i) 修改为 return 一个包含两个变量而不是简单字符串的 HashMap,但如果有一个 Rx 运算符可以做到这一点而无需修改基础功能。

创建或使用通用的 Pair 类型或 Tuple 类型并使用它们映射您的流。像这样:

Observable.from(tweet_id_list)
          .map(id -> Pair.create(id,getTweetText(id))
          .map(pair -> saveToLocalDB(pair.first(),pair.second())

如果您想要一张包含所有 ID-> 文本的完整地图:

Observable.from(tweet_id_list)
    .toMap((id) -> id, (id) -> getTweetText(id));
    .subscribe((tweetMap) -> saveAllToLocalDB(tweetMap));

这将发出(假设整数 ID 和字符串文本)单个地图。如果您可以一次将整个地图保存到数据库,而不是像在原始代码中那样进行多次保存调用,这将很有用(正如我上面假设的那样)。

您想确保避免可观察调用中的副作用。数据库保存等副作用只应在您进行订阅调用并观察结果时发生,而不是在地图本身中发生。如果您需要在后台进行保存,您可以在 .subscribe 调用之前添加类似 .observeOn(Schedulers.io()) 的内容,具体取决于您的线程需求。

如果您更喜欢单个结果并一个一个地保存它们,您可能想要使用平面图以便可以并行进行查找。我在这里使用 defer 这样我们就不会立即阻止 getTweetText 调用,而是等待每个调用被订阅。这使我们能够控制执行工作的线程。请注意,由于请求可能是并发的,因此结果不一定与 tweet_id_list.

的顺序相同
Observable.from(tweet_id_list)
    .flatMap((id) -> Observable.defer(() -> Observable.just(
        Pair.create(id, getTweetText(id))).subscribeOn(Schedulers.io()))
    .subscribe((pair) -> saveToLocalDB(pair.first(), pair.second())