根据优先级对未来的请求进行计算
Making Calculation on Future requests based on priority
我有一个关于处理异步操作和根据优先级采取行动的问题。
考虑以下代码:
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
// mock database call
def isSiteExcludedAtList(listId: Int, siteId: Int): Future[Some[Int]] = Future {
Some(1)
}
// main logic
def isExcluded(listIds: List[Int], subdomainHash: Int, domainHash: Int): Future[String] = {
val domainFutures: Future[List[Option[Int]]] = Future.traverse(listIds)(listId => isSiteExcludedAtList(listId, domainHash))
val subDomainFutures: Future[List[Option[Int]]] = Future.traverse(listIds)(listId => isSiteExcludedAtList(listId, subdomainHash))
// Is there other way?
for {
res <- Future.sequence(
List(
subDomainFutures.map(res => "subdomain" -> res),
domainFutures.map(res => "domain" -> res)
)
)
} yield {
val subdomainExclusion: List[Int] = res.filter(_._1 == "subdomain").flatMap(_._2).flatten
val domainExclusion: List[Int] = res.filter(_._1 == "domain").flatMap(_._2).flatten
if (subdomainExclusion.nonEmpty) {
s"its subdomain exclusion with results: ${subdomainExclusion}"
}
else {
s"its domain exclusion with results: ${domainExclusion}"
}
}
}
我想达到的目标:
- isSiteExcludedAtList returns 来自数据库的 Int 对象,这在我的示例中是模拟的,但它实际上是异步调用以从包含数据库中的 listId 和 siteId 的某个键获取一些 int 值。
- 我想创建 subdomainFutures 和 domainFutures 并开始 运行 它们一起
- 我想检查 subdomainFutures 是否有结果,如果有 - 它的子域排除,我想 return 这个
- 如果所有 subdomainFutures 都没有 return 任何结果 - 我想根据此检查 domainFutures 和 return 结果。
注意:子域只等待一个结果是可选优化。
有没有更漂亮的方法来实现这个?
谢谢!
所以你想同时获取域和子域,你也想同时执行尽可能多的isSiteExcludedAtList
。此外,如果至少有一个子域名,您想要取消这些域名。
可以很容易地使用 cats-effect 和 fs2 来表示 IO
(以下代码假定 isSiteExcludedAtList
return 是 IO[Option[Int]]
)
import cats.effect.IO
import cats.syntax.all._
import fs2.Stream
import fs2.concurrent.SignallingRef
def isExcluded(listIds: List[Int], subdomainHash: Int, domainHash: Int): IO[Unit] = {
def parallelStreamFor(siteId: Int): Stream[IO, Int] =
Stream
.emits(listIds)
.covary[IO]
.parEvalMapUnordered(maxConcurrent = 2)(listId => isSiteExcludedAtList(listId, siteId))
.collect {
case Some(result) => result
}
SignallingRef[IO].of(false).flatMap { signal =>
val processSubdomains =
parallelStreamFor(siteId = subdomainHash)
.evalTap(_ => signal.set(true))
.compile
.toList
val processDomains =
parallelStreamFor(siteId = domainHash)
.interruptWhen(signal)
.compile
.toList
(processSubdomains,processDomains).parTupled
} flatMap {
case (subdomainExclusions, domainExclusions) =>
if (subdomainExclusions.nonEmpty)
IO.println(s"Its subdomain exclusion with result: ${subdomainExclusions}")
else if (domainExclusions.nonEmpty)
IO.println(s"Its domain exclusion with result: ${domainExclusions}")
else
IO.println("All subdomains and domains are included!")
}
}
几个注意事项:
- 如果元素的顺序很重要,则将
parEvalMapUnordered
替换为 parEvalMap
,这样效率会低一些。
- 调整
maxConcurrent
的值,使其符合您的工作量。
- 如果您希望保持每个流同步并且只是 运行 两者同时发生,我们可以将
parEvalMapUnordered
+ parEvalMapUnordered
替换为对 evalMapFilter
[=62 的单个调用=]
- 由于
IO.fromFuture
和 IO.unsafeToFuture()
,您可以轻松地将其集成到您的代码库中,而无需进行太多重构
可以看到代码运行ning here.
编辑
OLD AND WRONG ANSWER
如果我理解正确的话,你想在第一个结果处停止处理 return a Some
如果您愿意使用 cats-effect,这很容易实现,如下所示:
import cats.effect.IO
import cats.syntax.all._
def isSiteExcludedAtList(listId: Int, siteId: Int): IO[Option[Int]] =
IO.println(s"Computing for ${listId} - ${siteId}").as(Some(10))
def isExcluded(listIds: List[Int], subdomainHash: Int, domainHash: Int): IO[Unit] = {
val processSubdomains =
listIds.collectFirstSomeM(listId => isSiteExcludedAtList(listId, siteId = subdomainHash))
val processDomains =
listIds.collectFirstSomeM(listId => isSiteExcludedAtList(listId, siteId = domainHash))
processSubdomains.flatMap {
case Some(subdomainExclusion) =>
IO.println(s"Its subdomain exclusion with result: ${subdomainExclusion}")
case None =>
processDomains.flatMap {
case Some(domainExclusion) =>
IO.println(s"Its domain exclusion with result: ${domainExclusion}")
case None =>
IO.println("All subdomains and domains are included!")
}
}
}
可以看到代码 运行ning here
Note: Another approach would be to tag each computation with is origin (domain, or subdomain) and combine all them in a big list and perform a single collectFirstSomeM
both are equivalent.
我想描述如何在仍然使用 futures 的同时改进您的代码,但我对这段代码的作用有点困惑。 isSiteExcludedAtList
return 这个数字是多少?它是一个标识符,你想收集所有列表id的标识符,你只关心你不想使用domainHash
查询是否足以使用subdomainHash
?这就是您的代码似乎正在做的事情,但是如果我正确理解上面的答案,即带有 cats-effect 和 collectFirstSomeM
的答案,那么该代码只会查找第一个结果 Some(number)
然后停下来。例如,如果第一次调用 isSiteExcludedAtList
将 return Some(1)
那么我们就不会再调用任何东西了。
所以,我有三个答案给你。
- 这是如果你想收集一个整数列表,你只想避免调用
isSiteExcludedAtList
和 domainHash
如果调用 subdomainHash
已经给你一些结果。在这种情况下,您可以同时链接 Future.traverse
并仅在第一个 return 没有结果时才调用第二个。
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
// mock database call
def isSiteExcludedAtList(listId: Int, siteId: Int): Future[Some[Int]] =
Future { Some(1) }
// main logic
def isExcluded(listIds: List[Int], subdomainHash: Int, domainHash: Int): Future[String] =
for {
res1 <- Future.traverse(listIds)(isSiteExcludedAtList(_, subdomainHash))
subIds = res1.flatten
res2 <- if (subIds.isEmpty)
Future.traverse(listIds)(isSiteExcludedAtList(_, domainHash))
else
Future.successful(Nil)
domIds = res2.flatten
} yield
if (subIds.nonEmpty)
s"its subdomain exclusion with results: ${subIds}"
else if (domIds.nonEmpty)
s"its domain exclusion with results: ${domIds}"
else
"no exclusion"
- 这是如果你查找第一个表明listId被排除的结果,然后你不想再查询了。在这种情况下,所有对
isSiteExcludedAtList
的调用都必须链接起来,即只有在前一个调用没有结果时才调用下一个调用。可以用递归来完成:
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
// mock database call
def isSiteExcludedAtList(listId: Int, siteId: Int): Future[Option[Int]] =
Future { Some(1) }
def isSiteExcludedAtList(listIds: List[Int], hash: Int): Future[Option[Int]] =
listIds match {
case Nil =>
Future.successful(None)
case head :: tail =>
isSiteExcludedAtList(head, hash).flatMap {
case Some(id) => Future.successful(Some(id))
case None => isSiteExcludedAtList(tail, hash)
}
}
// if you use Scala 3, change this to an enum
sealed trait Exclusion
final case class SubdomainExclusion(id: Int) extends Exclusion
final case class DomainExclusion(id: Int) extends Exclusion
case object NoExclusion extends Exclusion
// main logic
def isExcluded(listIds: List[Int], subdomainHash: Int, domainHash: Int): Future[String] =
isSiteExcludedAtList(listIds, subdomainHash).flatMap {
case Some(id) =>
Future.successful(SubdomainExclusion(id))
case None =>
isSiteExcludedAtList(listIds, domainHash).map {
case Some(id) => DomainExclusion(id)
case None => NoExclusion
}
}.map {
case SubdomainExclusion(id) => s"subdomain exclusion $id"
case DomainExclusion(id) => s"domain exclusion: $id"
case NoExclusion => "no exclusion"
}
- 第三种可能性是不使用
Future.traverse
并分别请求每个 listId
,您将实现一个查询,该查询将 return 给定哈希的所有排除 ID - subdomainHash
或 domainHash
,然后您只需检查您的 listIds
和由该查询编辑的 ID return 是否是 non-empty。该代码与我的第一个答案中的代码类似,但它只会对数据库进行两次调用。我写它是因为根据我的经验,这是处理数据库的一种常见模式:我们有一些已经编写的查询,随着我们的代码变得越来越复杂,我们开始在循环中使用这些查询,这导致 sub-optimal 性能,而我们可以编写更复杂的查询,我们只调用一次。
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
// mock database call
def isSiteExcludedAtListBulk(siteId: Int): Future[Set[Int]] =
Future { Set(10, 20, 30) }
// main logic
def isExcluded(listIds: List[Int], subdomainHash: Int, domainHash: Int): Future[String] =
for {
excludedSubIds <- isSiteExcludedAtListBulk(subdomainHash)
subIds = listIds.filter(excludedSubIds)
excludedDomIds <- if (subIds.isEmpty)
isSiteExcludedAtListBulk(domainHash)
else
Future.successful(Set.empty)
domIds = listIds.filter(excludedDomIds)
} yield
if (subIds.nonEmpty)
s"its subdomain exclusion with results: ${subIds}"
else if (domIds.nonEmpty)
s"its domain exclusion with results: ${domIds}"
else
"no exclusion"
也许是这样的?
subdomainFutures.map(_.flatten).flatMap {
case sds if (sds.nonEmpty) => Future.successful(sds -> Nil)
case _ => domainFutures.map(_.flatten).map(Nil -> _)
}.map {
case (sds, _) if (sds.nonEmpty) => s"subdomain exclusion $sds"
case (_, ds) if (ds.nonEmpty) => s"domain exclusion $ds"
case _ => "no exclusion"
}
或者,也许,将域查询也提升到同一级别:
subdomainFutures.zip(domainFutures)
.map { case (s,d) = (s.flatten, d.flatten) }
.map {
case (sds, _) if (sds.nonEmpty) => s"subdomain exclusion $sds"
case (_, ds) if (ds.nonEmpty) => s"domain exclusion $ds"
case _ => "no exclusion"
}
我认为,这或多或少与您所做的相同,只是在 IMO 中以更直接的方式表达。
一个缺点是它会等待所有子域查询返回,即使第一个 returns 结果(第二个变体看起来有点“光滑”,但它也会等待所有域查询无条件地,这是一个额外的低效率)。
有很多方法可以优化它(没有什么是不可能的!)但我想不出任何对我来说对用例来说看起来不会过于复杂的方法。
我有一个关于处理异步操作和根据优先级采取行动的问题。
考虑以下代码:
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
// mock database call
def isSiteExcludedAtList(listId: Int, siteId: Int): Future[Some[Int]] = Future {
Some(1)
}
// main logic
def isExcluded(listIds: List[Int], subdomainHash: Int, domainHash: Int): Future[String] = {
val domainFutures: Future[List[Option[Int]]] = Future.traverse(listIds)(listId => isSiteExcludedAtList(listId, domainHash))
val subDomainFutures: Future[List[Option[Int]]] = Future.traverse(listIds)(listId => isSiteExcludedAtList(listId, subdomainHash))
// Is there other way?
for {
res <- Future.sequence(
List(
subDomainFutures.map(res => "subdomain" -> res),
domainFutures.map(res => "domain" -> res)
)
)
} yield {
val subdomainExclusion: List[Int] = res.filter(_._1 == "subdomain").flatMap(_._2).flatten
val domainExclusion: List[Int] = res.filter(_._1 == "domain").flatMap(_._2).flatten
if (subdomainExclusion.nonEmpty) {
s"its subdomain exclusion with results: ${subdomainExclusion}"
}
else {
s"its domain exclusion with results: ${domainExclusion}"
}
}
}
我想达到的目标:
- isSiteExcludedAtList returns 来自数据库的 Int 对象,这在我的示例中是模拟的,但它实际上是异步调用以从包含数据库中的 listId 和 siteId 的某个键获取一些 int 值。
- 我想创建 subdomainFutures 和 domainFutures 并开始 运行 它们一起
- 我想检查 subdomainFutures 是否有结果,如果有 - 它的子域排除,我想 return 这个
- 如果所有 subdomainFutures 都没有 return 任何结果 - 我想根据此检查 domainFutures 和 return 结果。
注意:子域只等待一个结果是可选优化。
有没有更漂亮的方法来实现这个? 谢谢!
所以你想同时获取域和子域,你也想同时执行尽可能多的isSiteExcludedAtList
。此外,如果至少有一个子域名,您想要取消这些域名。
可以很容易地使用 cats-effect 和 fs2 来表示 IO
(以下代码假定 isSiteExcludedAtList
return 是 IO[Option[Int]]
)
import cats.effect.IO
import cats.syntax.all._
import fs2.Stream
import fs2.concurrent.SignallingRef
def isExcluded(listIds: List[Int], subdomainHash: Int, domainHash: Int): IO[Unit] = {
def parallelStreamFor(siteId: Int): Stream[IO, Int] =
Stream
.emits(listIds)
.covary[IO]
.parEvalMapUnordered(maxConcurrent = 2)(listId => isSiteExcludedAtList(listId, siteId))
.collect {
case Some(result) => result
}
SignallingRef[IO].of(false).flatMap { signal =>
val processSubdomains =
parallelStreamFor(siteId = subdomainHash)
.evalTap(_ => signal.set(true))
.compile
.toList
val processDomains =
parallelStreamFor(siteId = domainHash)
.interruptWhen(signal)
.compile
.toList
(processSubdomains,processDomains).parTupled
} flatMap {
case (subdomainExclusions, domainExclusions) =>
if (subdomainExclusions.nonEmpty)
IO.println(s"Its subdomain exclusion with result: ${subdomainExclusions}")
else if (domainExclusions.nonEmpty)
IO.println(s"Its domain exclusion with result: ${domainExclusions}")
else
IO.println("All subdomains and domains are included!")
}
}
几个注意事项:
- 如果元素的顺序很重要,则将
parEvalMapUnordered
替换为parEvalMap
,这样效率会低一些。 - 调整
maxConcurrent
的值,使其符合您的工作量。 - 如果您希望保持每个流同步并且只是 运行 两者同时发生,我们可以将
parEvalMapUnordered
+parEvalMapUnordered
替换为对evalMapFilter
[=62 的单个调用=] - 由于
IO.fromFuture
和IO.unsafeToFuture()
,您可以轻松地将其集成到您的代码库中,而无需进行太多重构
可以看到代码运行ning here.
编辑
OLD AND WRONG ANSWER
如果我理解正确的话,你想在第一个结果处停止处理 return a Some
如果您愿意使用 cats-effect,这很容易实现,如下所示:
import cats.effect.IO
import cats.syntax.all._
def isSiteExcludedAtList(listId: Int, siteId: Int): IO[Option[Int]] =
IO.println(s"Computing for ${listId} - ${siteId}").as(Some(10))
def isExcluded(listIds: List[Int], subdomainHash: Int, domainHash: Int): IO[Unit] = {
val processSubdomains =
listIds.collectFirstSomeM(listId => isSiteExcludedAtList(listId, siteId = subdomainHash))
val processDomains =
listIds.collectFirstSomeM(listId => isSiteExcludedAtList(listId, siteId = domainHash))
processSubdomains.flatMap {
case Some(subdomainExclusion) =>
IO.println(s"Its subdomain exclusion with result: ${subdomainExclusion}")
case None =>
processDomains.flatMap {
case Some(domainExclusion) =>
IO.println(s"Its domain exclusion with result: ${domainExclusion}")
case None =>
IO.println("All subdomains and domains are included!")
}
}
}
可以看到代码 运行ning here
Note: Another approach would be to tag each computation with is origin (domain, or subdomain) and combine all them in a big list and perform a single
collectFirstSomeM
both are equivalent.
我想描述如何在仍然使用 futures 的同时改进您的代码,但我对这段代码的作用有点困惑。 isSiteExcludedAtList
return 这个数字是多少?它是一个标识符,你想收集所有列表id的标识符,你只关心你不想使用domainHash
查询是否足以使用subdomainHash
?这就是您的代码似乎正在做的事情,但是如果我正确理解上面的答案,即带有 cats-effect 和 collectFirstSomeM
的答案,那么该代码只会查找第一个结果 Some(number)
然后停下来。例如,如果第一次调用 isSiteExcludedAtList
将 return Some(1)
那么我们就不会再调用任何东西了。
所以,我有三个答案给你。
- 这是如果你想收集一个整数列表,你只想避免调用
isSiteExcludedAtList
和domainHash
如果调用subdomainHash
已经给你一些结果。在这种情况下,您可以同时链接Future.traverse
并仅在第一个 return 没有结果时才调用第二个。
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
// mock database call
def isSiteExcludedAtList(listId: Int, siteId: Int): Future[Some[Int]] =
Future { Some(1) }
// main logic
def isExcluded(listIds: List[Int], subdomainHash: Int, domainHash: Int): Future[String] =
for {
res1 <- Future.traverse(listIds)(isSiteExcludedAtList(_, subdomainHash))
subIds = res1.flatten
res2 <- if (subIds.isEmpty)
Future.traverse(listIds)(isSiteExcludedAtList(_, domainHash))
else
Future.successful(Nil)
domIds = res2.flatten
} yield
if (subIds.nonEmpty)
s"its subdomain exclusion with results: ${subIds}"
else if (domIds.nonEmpty)
s"its domain exclusion with results: ${domIds}"
else
"no exclusion"
- 这是如果你查找第一个表明listId被排除的结果,然后你不想再查询了。在这种情况下,所有对
isSiteExcludedAtList
的调用都必须链接起来,即只有在前一个调用没有结果时才调用下一个调用。可以用递归来完成:
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
// mock database call
def isSiteExcludedAtList(listId: Int, siteId: Int): Future[Option[Int]] =
Future { Some(1) }
def isSiteExcludedAtList(listIds: List[Int], hash: Int): Future[Option[Int]] =
listIds match {
case Nil =>
Future.successful(None)
case head :: tail =>
isSiteExcludedAtList(head, hash).flatMap {
case Some(id) => Future.successful(Some(id))
case None => isSiteExcludedAtList(tail, hash)
}
}
// if you use Scala 3, change this to an enum
sealed trait Exclusion
final case class SubdomainExclusion(id: Int) extends Exclusion
final case class DomainExclusion(id: Int) extends Exclusion
case object NoExclusion extends Exclusion
// main logic
def isExcluded(listIds: List[Int], subdomainHash: Int, domainHash: Int): Future[String] =
isSiteExcludedAtList(listIds, subdomainHash).flatMap {
case Some(id) =>
Future.successful(SubdomainExclusion(id))
case None =>
isSiteExcludedAtList(listIds, domainHash).map {
case Some(id) => DomainExclusion(id)
case None => NoExclusion
}
}.map {
case SubdomainExclusion(id) => s"subdomain exclusion $id"
case DomainExclusion(id) => s"domain exclusion: $id"
case NoExclusion => "no exclusion"
}
- 第三种可能性是不使用
Future.traverse
并分别请求每个listId
,您将实现一个查询,该查询将 return 给定哈希的所有排除 ID -subdomainHash
或domainHash
,然后您只需检查您的listIds
和由该查询编辑的 ID return 是否是 non-empty。该代码与我的第一个答案中的代码类似,但它只会对数据库进行两次调用。我写它是因为根据我的经验,这是处理数据库的一种常见模式:我们有一些已经编写的查询,随着我们的代码变得越来越复杂,我们开始在循环中使用这些查询,这导致 sub-optimal 性能,而我们可以编写更复杂的查询,我们只调用一次。
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
// mock database call
def isSiteExcludedAtListBulk(siteId: Int): Future[Set[Int]] =
Future { Set(10, 20, 30) }
// main logic
def isExcluded(listIds: List[Int], subdomainHash: Int, domainHash: Int): Future[String] =
for {
excludedSubIds <- isSiteExcludedAtListBulk(subdomainHash)
subIds = listIds.filter(excludedSubIds)
excludedDomIds <- if (subIds.isEmpty)
isSiteExcludedAtListBulk(domainHash)
else
Future.successful(Set.empty)
domIds = listIds.filter(excludedDomIds)
} yield
if (subIds.nonEmpty)
s"its subdomain exclusion with results: ${subIds}"
else if (domIds.nonEmpty)
s"its domain exclusion with results: ${domIds}"
else
"no exclusion"
也许是这样的?
subdomainFutures.map(_.flatten).flatMap {
case sds if (sds.nonEmpty) => Future.successful(sds -> Nil)
case _ => domainFutures.map(_.flatten).map(Nil -> _)
}.map {
case (sds, _) if (sds.nonEmpty) => s"subdomain exclusion $sds"
case (_, ds) if (ds.nonEmpty) => s"domain exclusion $ds"
case _ => "no exclusion"
}
或者,也许,将域查询也提升到同一级别:
subdomainFutures.zip(domainFutures)
.map { case (s,d) = (s.flatten, d.flatten) }
.map {
case (sds, _) if (sds.nonEmpty) => s"subdomain exclusion $sds"
case (_, ds) if (ds.nonEmpty) => s"domain exclusion $ds"
case _ => "no exclusion"
}
我认为,这或多或少与您所做的相同,只是在 IMO 中以更直接的方式表达。
一个缺点是它会等待所有子域查询返回,即使第一个 returns 结果(第二个变体看起来有点“光滑”,但它也会等待所有域查询无条件地,这是一个额外的低效率)。
有很多方法可以优化它(没有什么是不可能的!)但我想不出任何对我来说对用例来说看起来不会过于复杂的方法。