使用 Akka 流如何从两个流中形成复合案例 class 对象?

Using Akka streams how can I form a composite case class object from two flows?

这是我的案例类:

case class User(id: String, location: Option[Location] = None, age: Option[String] = None) {

  override def toString: String = s"User: $id from ${location.toString} age of $age"
}

case class UserBookRating(user: User, bookISBN: String, rating: String)

如您所见,UserBookRating 取决于 User

我正在从两个单独的 csv 文件流式传输数据:Users.csv 和 BookRatings.csv

每个文件的示例行:

id, location, age

"10";"chicago, illinois, usa";"26"

id, isbn, rating

"10";"10240528340","5"

我正在使用带有 GraphDSL 的 Akka 流,我有两个来自每个文件的源流。我的问题是如何从两个流中获取数据以形成 BookRatings 对象,因为它取决于用户。现在我有两个单独的流,它们从 Users.csv 形成 User 对象,另一个流形成 BookRating 对象,但是只有 User 的 id 字段填充 BookRating 对象,因为这是我从 [=27= 知道的唯一信息] 文件。我如何组合来自流的数据以便形成 BookRating 对象?

在图表中创建一个 ZipWith 阶段。在 ZipWith 的构造函数中,您传入一个组合器函数,该函数转换两个 csv 流的输出的元组并输出一个 BookRating 流。