使用 Akka 流如何从两个流中形成复合案例 class 对象?
Using Akka streams how can I form a composite case class object from two flows?
这是我的案例类:
case class User(id: String, location: Option[Location] = None, age: Option[String] = None) {
override def toString: String = s"User: $id from ${location.toString} age of $age"
}
case class UserBookRating(user: User, bookISBN: String, rating: String)
如您所见,UserBookRating 取决于 User
我正在从两个单独的 csv 文件流式传输数据:Users.csv 和 BookRatings.csv
每个文件的示例行:
id, location, age
"10";"chicago, illinois, usa";"26"
id, isbn, rating
"10";"10240528340","5"
我正在使用带有 GraphDSL 的 Akka 流,我有两个来自每个文件的源流。我的问题是如何从两个流中获取数据以形成 BookRatings 对象,因为它取决于用户。现在我有两个单独的流,它们从 Users.csv 形成 User 对象,另一个流形成 BookRating 对象,但是只有 User 的 id 字段填充 BookRating 对象,因为这是我从 [=27= 知道的唯一信息] 文件。我如何组合来自流的数据以便形成 BookRating 对象?
在图表中创建一个 ZipWith
阶段。在 ZipWith
的构造函数中,您传入一个组合器函数,该函数转换两个 csv 流的输出的元组并输出一个 BookRating
流。
这是我的案例类:
case class User(id: String, location: Option[Location] = None, age: Option[String] = None) {
override def toString: String = s"User: $id from ${location.toString} age of $age"
}
case class UserBookRating(user: User, bookISBN: String, rating: String)
如您所见,UserBookRating 取决于 User
我正在从两个单独的 csv 文件流式传输数据:Users.csv 和 BookRatings.csv
每个文件的示例行:
id, location, age
"10";"chicago, illinois, usa";"26"
id, isbn, rating
"10";"10240528340","5"
我正在使用带有 GraphDSL 的 Akka 流,我有两个来自每个文件的源流。我的问题是如何从两个流中获取数据以形成 BookRatings 对象,因为它取决于用户。现在我有两个单独的流,它们从 Users.csv 形成 User 对象,另一个流形成 BookRating 对象,但是只有 User 的 id 字段填充 BookRating 对象,因为这是我从 [=27= 知道的唯一信息] 文件。我如何组合来自流的数据以便形成 BookRating 对象?
在图表中创建一个 ZipWith
阶段。在 ZipWith
的构造函数中,您传入一个组合器函数,该函数转换两个 csv 流的输出的元组并输出一个 BookRating
流。