akka流中的异步调用

Async call in akka stream flow

我想用这样的逻辑实现 Diff 转换流:

我有一些 Source[JsValue] 和一些带有 id->JsValue 数据的数据库。我想从 Source 和数据库中的 JsValue 获取差异,然后进一步拉取生成的差异并将 JsValue 从源存储到数据库。

我考虑将 Akka-persistence 作为存储实现,但我只需要当前状态,因此它可以是任何键值数据库。

由于我是 akka-stream 的新手,我不明白实现这个想法的最佳方法是什么。

假设您的 JsValue 对象有一个 "id" 字段,您可以编写一个查询 Flow 来接收原始的 JsValue 并生成原始的元组和数据库版本:

def dbQuery(id : String) : JsValue = ???

val queryFlow : Flow[JsValue, (JsValue,JsValue), _] = 
  Flow[JsValue] map { originalJs =>
    originalJs -> dbQuery((originalJs \ "id").as[String])
  }

这些元组可以传递到差异 Flow:

def diffJs(original : JsValue, dbVersion : JsValue) : JsValue = ???

val diffFlow : Flow[(JsValue, JsValue), JsValue, _] = 
  Flow[(JsValue, JsValue)] map diffJs.tupled

你提到的最后一部分是一个数据库,它会保留差异,Sink:

val dbSink : Sink[JsValue, _] = ???

然后可以根据您的价值来源组合所有这些组件以形成流:

val jsSource : Source[JsValue, _] = ???

jsSource via queryFlow via diffFlow runWith dbSink

有关异步数据库查询的示例,请参阅 演示 mapAsync