不同类型任务的功能组合 - Scala

Functional composition of different types of Tasks - Scala

目前,我正在设计一个在 Scala 中构建通用管道的设计(纯粹用于学习目的)。为此,我从一个基本构造 Task 开始,它采用一些 TaskConfiguration(现在,我们可以假设这个 TaskConfiguration 是一个特定于 Task 功能的案例 class)。 Trait结构如下:

trait Task[T <: TaskConfiguration] {
  type Out

  def taskConfiguration: T
  def execute(previousOutput: Option[Out]): Option[Out]
}

要求: 1. 我可以有多个扩展任务特征的任务。比如,ReadTask、WriteTask 等, 2. 每个任务都有自己的 "out"

类型

我的问题是:给定一个 List[Task],我如何编写要执行的方法调用。尝试了多种方式来组合它们,但我一直遇到无法区分上一个任务结束和当前任务结束的问题,因为我只有一个类型成员来指定该任务可以处理的内容。

我希望我们可以用 Scala 解决这个问题。但考虑到我对使用 Scala 进行函数式编程还很陌生,所以我无法弄明白。非常感谢您。

注意:这个问题的描述可能有点断章取义。但这是目前我能想到的最好的。如果您认为我们可以做得更好,请随意编辑它。如果您觉得这个问题没有任何意义,请在评论中提出您的想法,以便我解决。

您可以使用与 Scala 函数中的 andThen 类似的模式。

我编译了一个小例子:


import scala.util.{Try, Success, Failure}

type TaskConfiguration = Any

trait Task[-C <: TaskConfiguration, +O <: TaskConfiguration] {

  def execute(configuration: C): Option[O]

  def andThen[O2 <: TaskConfiguration](secondTask: Task[O, O2]): Task[C, O2] = {
    val firstTask = this

    new Task[C, O2] {
       def execute(configuration: C): Option[O2] =
         firstTask.execute(configuration).flatMap(secondTask.execute(_))
    }
  }
}

// From here on it's the example!

case class UnparsedNumber(value: String)

trait ParsedNumber {
  val value: Int
}

case class ParsedPositiveNumber(int: Int) extends ParsedNumber {
  val value: Int = int
}

case class HumanReadableNumber(value: String)


val task1 = new Task[UnparsedNumber, ParsedPositiveNumber] {
  def execute(configuration: UnparsedNumber): Option[ParsedPositiveNumber] = {
    Try(configuration.value.toInt) match {
      case Success(i) if i >= 0 => Some(ParsedPositiveNumber(i))
      case Success(_) => None
      case Failure(_) => None
    }
  }
}

val task2 = new Task[ParsedNumber, HumanReadableNumber] {
  def execute(configuration: ParsedNumber): Option[HumanReadableNumber] = {
    if(configuration.value < 1000 && configuration.value > -1000)
      Some(HumanReadableNumber(s"The number is $configuration"))
    else
      None
  }
}

val combined = task1.andThen(task2)

println(combined.execute(UnparsedNumber("12")))
println(combined.execute(UnparsedNumber("12x")))
println(combined.execute(UnparsedNumber("-12")))
println(combined.execute(UnparsedNumber("10000")))
println(combined.execute(UnparsedNumber("-10000")))

Try it out!


编辑:

关于您的评论,这种方法可能更符合您的要求:

case class Task[-C, +O](f: C => Option[O]) {

  def execute(c: C): Option[O] = f.apply(c)
}

case class TaskChain[C, O <: C](tasks: List[Task[C, O]]) {

  def run(initial: C): Option[O] = {

    def runTasks(output: Option[C], tail: List[Task[C, O]]): Option[O] = {
      output match {
        case Some(o) => tail match {
          case head :: Nil => head.execute(o)
          case head :: tail => runTasks(head.execute(o), tail)
          case Nil => ??? // This should never happen!
        }
        case None => None
      }
    }

    runTasks(Some(initial), tasks)
  }
}

// Example below:

val t1: Task[Int, Int] = Task(i => Some(i * 2))
val t2: Task[Int, Int] = Task(i => Some(i - 100))
val t3: Task[Int, Int] = Task(i => if(i > 0) Some(i) else None)


val chain: TaskChain[Int, Int] = TaskChain(List(t1, t2, t3))

println(chain.run(100))
println(chain.run(10))

Try it out!

引用:

您需要了解的是,如果您将 Task 打包在 List[Task] 中并将其用作 Task 链,则输出必须至少为输入的子类型。 C <: TaskConfigurationO <: C 导致: O <: C <: TaskConfiguration 这也意味着 O <: TaskConfiguration.


如果您不明白其中的任何部分,我很乐意进一步解释。

希望对您有所帮助。

我建议您看一下 cats and free monads 可以为您提供什么。按照这种方法,我将开始定义用于定义管道程序的 ADT。像 :

trait TaskE[Effect]
case class ReadTask[Input, SourceConfig](source: SourceConfig) extends TaskE[Input]
case class WriteTask[Output, SinkConfig](out: Output, sink: SinkConfig) extends TaskE[Unit]

然后应用 Free monads(如上文所述 link)来定义您的管道流。类似于:

val pipeline: Task[Unit] = 
  for {
    input1 <- read(source1)
    input2 <- read(source2)
    _      <- write(input1 + input2, sink1)
  } yield ()

现在它将取决于编译器(它是一个自然转换,描述了如何从 Task[A] 转换为 F[A],而 F 可能是 Id,Try, Future, ...) 你定义这个程序如何 运行:

val myCompiler: Task ~> Id = ???
val tryCompiler: Task ~> Try = ???

pipeline.foldMap(myCompiler)  // Id[Unit]
pipeline.foldMap(tryCompiler) // Try[Unit]

您可以根据需要拥有任意数量的 'compilers',这并不意味着要更改您的管道 ('program') 定义。