使用功能结构的组合批处理

Compositional batching using functional constructs

我正在尝试创建组合批处理机制。

流程必须是。

  1. s: Map[BatchFragmentId, FragmentRequest]
  2. 的形式累加状态
  3. Map[BatchFragmentId, FragmentResponse]
  4. 的形式计算结果
  5. 对某些结果 R 执行一系列形式为 f: Map[BatchFragmentId, FragmentResponse] => R 的预定义计算。

我的解决方案

Batch[R] 为一个单子,其中包含一些累积状态 s 和一些函数 f.

map[B](g: R => B) 非常有意义,因为它只是自包含函数 f (Batch.make(f=f.andThen(g), s=s).

的简单组合

flatMap[B](f: R => Batch[B])手上没有多大意义,因为R需要计算完成,因此Batch[B]此时不存在,因为s执行响应计算时必须包含 Batch[B] 的状态。

修改flatMap

def combine[B, C](that: Batch[B])(t: (A, B) => C): Batch[C] = {
  val combined: Map[BatchFragmentId, FragmentRequest] = that.state ++ state
  val g: Map[BatchFragmentId, FragmentResponse] => C = 
    (r: Map[BatchFragmentId, FragmentResponse]) => t(f(r), that.f(r))
  Batch.make(combined, g)
}

这与 Scala 的 for comprehensions 配合得不好,而且它的可读性比 for comprehension 下降得更快。

val b: Batch[String] = ???
val b2: Batch[String] = ???
val b3: Batch[Int] = ???
val combined: Batch[(String, String)] = 
  (b.combine(b2){ case (s1, s2) => s1 + s2 })
  .combine(b3){ case (s1, i1) => s1 + i1.toString }

分几批可能会变得非常混乱。能够组成这些批次的理想方式是。

val o: Batch[(String, String)] = for {
  _ <- put("key", "42")
  x1 <- get("key")
  _ <- put("key2", "24")
  x2 <- get("key2")
} yield (x1, x2)

但任何优雅的语法都是一种解决方案。

我不太精通范畴论,并且在编写这种形式的代码方面经验有限,所以我不确定类要研究什么类型来解决这个特定问题。

对于这样的问题,这是否是正确的方法?我是否适当地对问题建模?这个问题是否已经得到很好的理解和概括?

我也养猫。

在挖掘了 cat 提供的各种类型类后,我发现 Applicative 非常适合这个问题。

对于任何对结果感兴趣的人,我最终得到以下结果。

type BatchFragmentId = String
trait FragmentRequest
trait FragmentResponse
type S = Map[BatchFragmentId, FragmentRequest]

object Batch {
  def make[A](s: S, f: Map[BatchFragmentId, FragmentResponse] => A): Batch[A] = Batch[A](
   transformer = f,
   state = s
  )
}

implicit object BatchApplicative extends Applicative[Batch] {
  override def pure[A](x: A): Batch[A] = Batch[A](
    transformer = _ => x,
    state =  Map.empty
  )

  override def ap[A, B](ff: Batch[A => B])(fa: Batch[A]): Batch[B] = {
    val f: Map[String, FragmentResponse] => B = (r: Map[BatchFragmentId, FragmentResponse]) => ff.transformer(r)(fa.transformer(r))
    Batch[B](
      transformer = f,
      state =  ff.state ++ fa.state
    )
  }
}

case class Batch[A] (
                      transformer: Map[BatchFragmentId, FragmentResponse] => A,
                      state: S
                    )

implicit class BatchOps[A](fa: Batch[A])(implicit A: Applicative[Batch]) {
  def <>[B](fb: Batch[B]): Batch[(A, B)] = A.product(fa, fb)
}

// Example
case class SomeFragmentRequest(x: Int) extends FragmentRequest
case class SomeFragmentResponse(x: Int) extends FragmentResponse

val id: BatchFragmentId = "some-id"
val exampleBatch = Batch.make[SomeFragmentResponse](
  s = Map(id -> SomeFragmentRequest(42)),
  f = responses => responses.get(id).map{ case x: SomeFragmentResponse => x }.get
)

// Example of composition
val o: Batch[(SomeFragmentResponse, SomeFragmentResponse)] = exampleBatch <> exampleBatch
// N arity
val o2: Batch[(SomeFragmentResponse, SomeFragmentResponse, SomeFragmentResponse, SomeFragmentResponse)] =
  Applicative[Batch].tuple4(exampleBatch, exampleBatch, exampleBatch, exampleBatch)

def run(s: S): Map[BatchFragmentId, FragmentResponse] = ???
def resolve[R](r: Map[BatchFragmentId, FragmentResponse], b: Batch[R]): R = b.transformer(r)

val res: (SomeFragmentResponse, SomeFragmentResponse) = resolve(run(o.state), o)