链接 Observable 和 Emitting / 将原始 Emit 传递给订阅调用
Chaining Observable & Emitting / Passing Original Emit to Subscribe Call
我有一个用例,我正在使用消息、保存它们然后回复成功或失败。 mongo 插入 returns 一个 Observable,这样我就可以使用平面图进行链接。问题是 insert Observable 发出插入的结果,但我需要从第一个 observable 发出的原始消息来回复。因此,为了完成这项工作,我 运行 在第一个 Observable 的订阅内插入并在第二个订阅内回复。
我希望通过某种像 flatmap 这样的运算符以一种更具反应性的方式来完成这项工作。我搜索了运算符列表,但没有找到我要找的东西。
eb.consumer("persister.save.event").toObservable()
.subscribe(msg -> {
mongo.insertObservable("event", (JsonObject) msg.body())
.subscribe(
res -> msg.reply(new JsonObject().put("success", true)),
error -> msg.fail(500, "failed to save event"));
});
上面的代码是应该这样做还是有更好的方法?两个订阅感觉不对
以下是避免两个订阅者的方法:
eb.consumer("persister.save.event").toObservable()
.flatMap(msg -> mongo.insertObservable("event", (JsonObject) msg.body()).map(mongoResponse -> msg))
.subscribe(
res -> msg.reply(new JsonObject().put("success", true)),
error -> msg.fail(500, "failed to save event"));
诀窍是 map
你的 mongo 结果在 flatMap
.
中想要 msg
我有一个用例,我正在使用消息、保存它们然后回复成功或失败。 mongo 插入 returns 一个 Observable,这样我就可以使用平面图进行链接。问题是 insert Observable 发出插入的结果,但我需要从第一个 observable 发出的原始消息来回复。因此,为了完成这项工作,我 运行 在第一个 Observable 的订阅内插入并在第二个订阅内回复。
我希望通过某种像 flatmap 这样的运算符以一种更具反应性的方式来完成这项工作。我搜索了运算符列表,但没有找到我要找的东西。
eb.consumer("persister.save.event").toObservable()
.subscribe(msg -> {
mongo.insertObservable("event", (JsonObject) msg.body())
.subscribe(
res -> msg.reply(new JsonObject().put("success", true)),
error -> msg.fail(500, "failed to save event"));
});
上面的代码是应该这样做还是有更好的方法?两个订阅感觉不对
以下是避免两个订阅者的方法:
eb.consumer("persister.save.event").toObservable()
.flatMap(msg -> mongo.insertObservable("event", (JsonObject) msg.body()).map(mongoResponse -> msg))
.subscribe(
res -> msg.reply(new JsonObject().put("success", true)),
error -> msg.fail(500, "failed to save event"));
诀窍是 map
你的 mongo 结果在 flatMap
.
msg