使用功能结构的组合批处理
Compositional batching using functional constructs
我正在尝试创建组合批处理机制。
流程必须是。
- 以
s: Map[BatchFragmentId, FragmentRequest]
的形式累加状态
- 以
Map[BatchFragmentId, FragmentResponse]
的形式计算结果
- 对某些结果 R 执行一系列形式为
f: Map[BatchFragmentId, FragmentResponse] => R
的预定义计算。
我的解决方案
设 Batch[R]
为一个单子,其中包含一些累积状态 s
和一些函数 f
.
map[B](g: R => B)
非常有意义,因为它只是自包含函数 f
(Batch.make(f=f.andThen(g), s=s)
.
的简单组合
flatMap[B](f: R => Batch[B])
手上没有多大意义,因为R
需要计算完成,因此Batch[B]
此时不存在,因为s
执行响应计算时必须包含 Batch[B]
的状态。
修改flatMap
:
def combine[B, C](that: Batch[B])(t: (A, B) => C): Batch[C] = {
val combined: Map[BatchFragmentId, FragmentRequest] = that.state ++ state
val g: Map[BatchFragmentId, FragmentResponse] => C =
(r: Map[BatchFragmentId, FragmentResponse]) => t(f(r), that.f(r))
Batch.make(combined, g)
}
这与 Scala 的 for comprehensions 配合得不好,而且它的可读性比 for comprehension 下降得更快。
val b: Batch[String] = ???
val b2: Batch[String] = ???
val b3: Batch[Int] = ???
val combined: Batch[(String, String)] =
(b.combine(b2){ case (s1, s2) => s1 + s2 })
.combine(b3){ case (s1, i1) => s1 + i1.toString }
分几批可能会变得非常混乱。能够组成这些批次的理想方式是。
val o: Batch[(String, String)] = for {
_ <- put("key", "42")
x1 <- get("key")
_ <- put("key2", "24")
x2 <- get("key2")
} yield (x1, x2)
但任何优雅的语法都是一种解决方案。
我不太精通范畴论,并且在编写这种形式的代码方面经验有限,所以我不确定类要研究什么类型来解决这个特定问题。
对于这样的问题,这是否是正确的方法?我是否适当地对问题建模?这个问题是否已经得到很好的理解和概括?
我也养猫。
在挖掘了 cat 提供的各种类型类后,我发现 Applicative
非常适合这个问题。
对于任何对结果感兴趣的人,我最终得到以下结果。
type BatchFragmentId = String
trait FragmentRequest
trait FragmentResponse
type S = Map[BatchFragmentId, FragmentRequest]
object Batch {
def make[A](s: S, f: Map[BatchFragmentId, FragmentResponse] => A): Batch[A] = Batch[A](
transformer = f,
state = s
)
}
implicit object BatchApplicative extends Applicative[Batch] {
override def pure[A](x: A): Batch[A] = Batch[A](
transformer = _ => x,
state = Map.empty
)
override def ap[A, B](ff: Batch[A => B])(fa: Batch[A]): Batch[B] = {
val f: Map[String, FragmentResponse] => B = (r: Map[BatchFragmentId, FragmentResponse]) => ff.transformer(r)(fa.transformer(r))
Batch[B](
transformer = f,
state = ff.state ++ fa.state
)
}
}
case class Batch[A] (
transformer: Map[BatchFragmentId, FragmentResponse] => A,
state: S
)
implicit class BatchOps[A](fa: Batch[A])(implicit A: Applicative[Batch]) {
def <>[B](fb: Batch[B]): Batch[(A, B)] = A.product(fa, fb)
}
// Example
case class SomeFragmentRequest(x: Int) extends FragmentRequest
case class SomeFragmentResponse(x: Int) extends FragmentResponse
val id: BatchFragmentId = "some-id"
val exampleBatch = Batch.make[SomeFragmentResponse](
s = Map(id -> SomeFragmentRequest(42)),
f = responses => responses.get(id).map{ case x: SomeFragmentResponse => x }.get
)
// Example of composition
val o: Batch[(SomeFragmentResponse, SomeFragmentResponse)] = exampleBatch <> exampleBatch
// N arity
val o2: Batch[(SomeFragmentResponse, SomeFragmentResponse, SomeFragmentResponse, SomeFragmentResponse)] =
Applicative[Batch].tuple4(exampleBatch, exampleBatch, exampleBatch, exampleBatch)
def run(s: S): Map[BatchFragmentId, FragmentResponse] = ???
def resolve[R](r: Map[BatchFragmentId, FragmentResponse], b: Batch[R]): R = b.transformer(r)
val res: (SomeFragmentResponse, SomeFragmentResponse) = resolve(run(o.state), o)
我正在尝试创建组合批处理机制。
流程必须是。
- 以
s: Map[BatchFragmentId, FragmentRequest]
的形式累加状态
- 以
Map[BatchFragmentId, FragmentResponse]
的形式计算结果
- 对某些结果 R 执行一系列形式为
f: Map[BatchFragmentId, FragmentResponse] => R
的预定义计算。
我的解决方案
设 Batch[R]
为一个单子,其中包含一些累积状态 s
和一些函数 f
.
map[B](g: R => B)
非常有意义,因为它只是自包含函数 f
(Batch.make(f=f.andThen(g), s=s)
.
flatMap[B](f: R => Batch[B])
手上没有多大意义,因为R
需要计算完成,因此Batch[B]
此时不存在,因为s
执行响应计算时必须包含 Batch[B]
的状态。
修改flatMap
:
def combine[B, C](that: Batch[B])(t: (A, B) => C): Batch[C] = {
val combined: Map[BatchFragmentId, FragmentRequest] = that.state ++ state
val g: Map[BatchFragmentId, FragmentResponse] => C =
(r: Map[BatchFragmentId, FragmentResponse]) => t(f(r), that.f(r))
Batch.make(combined, g)
}
这与 Scala 的 for comprehensions 配合得不好,而且它的可读性比 for comprehension 下降得更快。
val b: Batch[String] = ???
val b2: Batch[String] = ???
val b3: Batch[Int] = ???
val combined: Batch[(String, String)] =
(b.combine(b2){ case (s1, s2) => s1 + s2 })
.combine(b3){ case (s1, i1) => s1 + i1.toString }
分几批可能会变得非常混乱。能够组成这些批次的理想方式是。
val o: Batch[(String, String)] = for {
_ <- put("key", "42")
x1 <- get("key")
_ <- put("key2", "24")
x2 <- get("key2")
} yield (x1, x2)
但任何优雅的语法都是一种解决方案。
我不太精通范畴论,并且在编写这种形式的代码方面经验有限,所以我不确定类要研究什么类型来解决这个特定问题。
对于这样的问题,这是否是正确的方法?我是否适当地对问题建模?这个问题是否已经得到很好的理解和概括?
我也养猫。
在挖掘了 cat 提供的各种类型类后,我发现 Applicative
非常适合这个问题。
对于任何对结果感兴趣的人,我最终得到以下结果。
type BatchFragmentId = String
trait FragmentRequest
trait FragmentResponse
type S = Map[BatchFragmentId, FragmentRequest]
object Batch {
def make[A](s: S, f: Map[BatchFragmentId, FragmentResponse] => A): Batch[A] = Batch[A](
transformer = f,
state = s
)
}
implicit object BatchApplicative extends Applicative[Batch] {
override def pure[A](x: A): Batch[A] = Batch[A](
transformer = _ => x,
state = Map.empty
)
override def ap[A, B](ff: Batch[A => B])(fa: Batch[A]): Batch[B] = {
val f: Map[String, FragmentResponse] => B = (r: Map[BatchFragmentId, FragmentResponse]) => ff.transformer(r)(fa.transformer(r))
Batch[B](
transformer = f,
state = ff.state ++ fa.state
)
}
}
case class Batch[A] (
transformer: Map[BatchFragmentId, FragmentResponse] => A,
state: S
)
implicit class BatchOps[A](fa: Batch[A])(implicit A: Applicative[Batch]) {
def <>[B](fb: Batch[B]): Batch[(A, B)] = A.product(fa, fb)
}
// Example
case class SomeFragmentRequest(x: Int) extends FragmentRequest
case class SomeFragmentResponse(x: Int) extends FragmentResponse
val id: BatchFragmentId = "some-id"
val exampleBatch = Batch.make[SomeFragmentResponse](
s = Map(id -> SomeFragmentRequest(42)),
f = responses => responses.get(id).map{ case x: SomeFragmentResponse => x }.get
)
// Example of composition
val o: Batch[(SomeFragmentResponse, SomeFragmentResponse)] = exampleBatch <> exampleBatch
// N arity
val o2: Batch[(SomeFragmentResponse, SomeFragmentResponse, SomeFragmentResponse, SomeFragmentResponse)] =
Applicative[Batch].tuple4(exampleBatch, exampleBatch, exampleBatch, exampleBatch)
def run(s: S): Map[BatchFragmentId, FragmentResponse] = ???
def resolve[R](r: Map[BatchFragmentId, FragmentResponse], b: Batch[R]): R = b.transformer(r)
val res: (SomeFragmentResponse, SomeFragmentResponse) = resolve(run(o.state), o)