akka流中的异步调用
Async call in akka stream flow
我想用这样的逻辑实现 Diff 转换流:
我有一些 Source[JsValue]
和一些带有 id->JsValue 数据的数据库。我想从 Source
和数据库中的 JsValue
获取差异,然后进一步拉取生成的差异并将 JsValue
从源存储到数据库。
我考虑将 Akka-persistence 作为存储实现,但我只需要当前状态,因此它可以是任何键值数据库。
由于我是 akka-stream 的新手,我不明白实现这个想法的最佳方法是什么。
假设您的 JsValue
对象有一个 "id" 字段,您可以编写一个查询 Flow
来接收原始的 JsValue
并生成原始的元组和数据库版本:
def dbQuery(id : String) : JsValue = ???
val queryFlow : Flow[JsValue, (JsValue,JsValue), _] =
Flow[JsValue] map { originalJs =>
originalJs -> dbQuery((originalJs \ "id").as[String])
}
这些元组可以传递到差异 Flow
:
def diffJs(original : JsValue, dbVersion : JsValue) : JsValue = ???
val diffFlow : Flow[(JsValue, JsValue), JsValue, _] =
Flow[(JsValue, JsValue)] map diffJs.tupled
你提到的最后一部分是一个数据库,它会保留差异, 和 Sink
:
val dbSink : Sink[JsValue, _] = ???
然后可以根据您的价值来源组合所有这些组件以形成流:
val jsSource : Source[JsValue, _] = ???
jsSource via queryFlow via diffFlow runWith dbSink
有关异步数据库查询的示例,请参阅 演示 mapAsync
。
我想用这样的逻辑实现 Diff 转换流:
我有一些 Source[JsValue]
和一些带有 id->JsValue 数据的数据库。我想从 Source
和数据库中的 JsValue
获取差异,然后进一步拉取生成的差异并将 JsValue
从源存储到数据库。
我考虑将 Akka-persistence 作为存储实现,但我只需要当前状态,因此它可以是任何键值数据库。
由于我是 akka-stream 的新手,我不明白实现这个想法的最佳方法是什么。
假设您的 JsValue
对象有一个 "id" 字段,您可以编写一个查询 Flow
来接收原始的 JsValue
并生成原始的元组和数据库版本:
def dbQuery(id : String) : JsValue = ???
val queryFlow : Flow[JsValue, (JsValue,JsValue), _] =
Flow[JsValue] map { originalJs =>
originalJs -> dbQuery((originalJs \ "id").as[String])
}
这些元组可以传递到差异 Flow
:
def diffJs(original : JsValue, dbVersion : JsValue) : JsValue = ???
val diffFlow : Flow[(JsValue, JsValue), JsValue, _] =
Flow[(JsValue, JsValue)] map diffJs.tupled
你提到的最后一部分是一个数据库,它会保留差异,Sink
:
val dbSink : Sink[JsValue, _] = ???
然后可以根据您的价值来源组合所有这些组件以形成流:
val jsSource : Source[JsValue, _] = ???
jsSource via queryFlow via diffFlow runWith dbSink
有关异步数据库查询的示例,请参阅 mapAsync
。