FS2 运行 按顺序流
FS2 Running streams in sequence
我有一个相当简单的用例。我有两个 Web 服务调用,一个获取产品,另一个获取关系。我想 运行 fetchProducts() 首先从产品集中提取一个字段,然后将输出传递给 fetchRelationships(ids: Seq[String]),这样我就可以在产品上重新设置关系。这是代码:
def fetchProducts(): Stream[IO, Seq[Product]]= {
//webservice call
}
def fetchRelationship(ids: Seq[Product]): Stream[IO, Seq[Relationship]] = {
//webservice call
}
//Pseudocode. How can I do this with fs2 Streams?
def process = {
val prods = fetchProducts() //run this first
val prodIds = prods.flatMap(i => i.productId)
val rels = fetchRelationships(prodIds) //run after all all products are fetched
prods.forEach(p => p.setRelation(rels.get(p.id))
}
}
case class Product(productId: Option[String],
name: Option[String],
description: Option[String],
brandName: Option[String])
我受制于外部Api无法批量获取结果。所以我不确定如何使用 fs2 来表达它,或者我是否应该使用它。
不幸的是,您在问题中的代码与您的文字描述不符,并且遗漏了很多重要的部分(例如整个 Relationship
class)。也不清楚
I am constrained by the external Api to get the results in batches
确实是这个意思。也不清楚为什么 Product
中的所有字段包括 productId
都是 Option
.
以下代码可以编译,可能是也可能不是您需要的代码:
case class Product(productId: Option[String],
name: Option[String],
description: Option[String],
brandName: Option[String],
relationships: mutable.ListBuffer[Relationship]) {
}
case class Relationship(productId: String, someInfo: String)
def fetchProducts(): Stream[IO, Seq[Product]] = {
//webservice call
???
}
// def fetchRelationships(ids: Seq[Product]): Stream[IO, Seq[Relationship]] = {
def fetchRelationships(ids: Seq[String]): Stream[IO, Seq[Relationship]] = {
//webservice call
???
}
def process(): = {
val prods = fetchProducts() //run this first
val prodsAndRels: Stream[IO, (Seq[Product], Seq[Relationship])] = prods.flatMap(ps => fetchRelationships(ps.map(p => p.productId.get)).map(rs => (ps, rs)))
val prodsWithFilledRels: Stream[IO, immutable.Seq[Product]] = prodsAndRels.map({ case (ps, rs) => {
val productsMap = ps.map(p => (p.productId.get, p)).toMap
rs.foreach(rel => productsMap(rel.productId).relationships += rel)
ps.toList
}
})
prodsWithFilledRels
}
我有一个相当简单的用例。我有两个 Web 服务调用,一个获取产品,另一个获取关系。我想 运行 fetchProducts() 首先从产品集中提取一个字段,然后将输出传递给 fetchRelationships(ids: Seq[String]),这样我就可以在产品上重新设置关系。这是代码:
def fetchProducts(): Stream[IO, Seq[Product]]= {
//webservice call
}
def fetchRelationship(ids: Seq[Product]): Stream[IO, Seq[Relationship]] = {
//webservice call
}
//Pseudocode. How can I do this with fs2 Streams?
def process = {
val prods = fetchProducts() //run this first
val prodIds = prods.flatMap(i => i.productId)
val rels = fetchRelationships(prodIds) //run after all all products are fetched
prods.forEach(p => p.setRelation(rels.get(p.id))
}
}
case class Product(productId: Option[String],
name: Option[String],
description: Option[String],
brandName: Option[String])
我受制于外部Api无法批量获取结果。所以我不确定如何使用 fs2 来表达它,或者我是否应该使用它。
不幸的是,您在问题中的代码与您的文字描述不符,并且遗漏了很多重要的部分(例如整个 Relationship
class)。也不清楚
I am constrained by the external Api to get the results in batches
确实是这个意思。也不清楚为什么 Product
中的所有字段包括 productId
都是 Option
.
以下代码可以编译,可能是也可能不是您需要的代码:
case class Product(productId: Option[String],
name: Option[String],
description: Option[String],
brandName: Option[String],
relationships: mutable.ListBuffer[Relationship]) {
}
case class Relationship(productId: String, someInfo: String)
def fetchProducts(): Stream[IO, Seq[Product]] = {
//webservice call
???
}
// def fetchRelationships(ids: Seq[Product]): Stream[IO, Seq[Relationship]] = {
def fetchRelationships(ids: Seq[String]): Stream[IO, Seq[Relationship]] = {
//webservice call
???
}
def process(): = {
val prods = fetchProducts() //run this first
val prodsAndRels: Stream[IO, (Seq[Product], Seq[Relationship])] = prods.flatMap(ps => fetchRelationships(ps.map(p => p.productId.get)).map(rs => (ps, rs)))
val prodsWithFilledRels: Stream[IO, immutable.Seq[Product]] = prodsAndRels.map({ case (ps, rs) => {
val productsMap = ps.map(p => (p.productId.get, p)).toMap
rs.foreach(rel => productsMap(rel.productId).relationships += rel)
ps.toList
}
})
prodsWithFilledRels
}