如何创建递归生成项目的 Akka Stream Source

How to create an Akka Stream Source that generates items recursively

我正在尝试弄清楚如何创建生成许多 Seq[Int] 的 Akka Streams 源。

基本上,给定一个 int n 我想生成 1 到 n

的所有 Seq[Int]

下面是执行此操作的一些代码:

def combinations(n: Int): Seq[Seq[Int]] = {
    def loop(acc: (Seq[Int], Seq[Seq[Int]]),
             remaining: Seq[Int]): Seq[Seq[Int]] = {
      remaining match {
        case s if s.size == 1 => {
          val total: Seq[Seq[Int]] = acc._2
          val current: Seq[Int] = acc._1
          total :+ (current :+ s.head)
        }
        case _ => {
          for {
            x <- remaining
            comb <- loop((acc._1 :+ x, acc._2), remaining.filter(_ != x))
          } yield comb
        }
      }
    }

    loop((Seq(), Seq()), (1 to n))
  }

这在 10 以内都可以正常工作...然后由于内存不足而崩溃。因为我只想处理它们中的每一个而不需要将它们全部保存在内存中,所以我想...... Akka Streams。但是我不知道如何将它变成一个产生每个组合的源,以便我可以处理它们。基本上在它附加到总计的地方我会在流中生成另一个项目。

这是一个使用 Johnson-Trotter 算法进行排列的解决方案。 tcopermutations 创建一个可以根据需要进行评估的 LazyList。如需更多排列,只需将不同的值传递给 printNIterations.

之所以使用Johnson-Trotter算法,是因为它打破了排列查找算法的递归结构。这对于能够评估排列的连续实例并将它们存储在某种惰性列表或流中很重要。

object PermutationsTest {
  def main(args: Array[String]) = {
    printNIterations(50, tcopermutations(5).iterator)
  }

  def printNIterations(n: Int, it: Iterator[Seq[Int]]): Unit = {
    if (n<=0) ()
    else {
      if (it.hasNext) {
        println(it.next())
        printNIterations(n - 1, it)
      } else ()
    }

  }

  def naivepermutations(n: Int): Seq[Seq[Int]] = {
    def loop(acc: Seq[Int], remaining: Seq[Int]): Seq[Seq[Int]] = {
      remaining match {
        case s if s.size == 1 => {
          val current: Seq[Int] = acc
          Seq((current :+ s.head))
        }
        case _ => {
          for {
            x <- remaining
            comb <- loop(acc :+ x, remaining.filter(_ != x))
          } yield comb
        }
      }
    }

    loop(Seq(), (1 to n))
  }

  def tcopermutations(n: Int): LazyList[Seq[Int]] = {
    val start = (1 to n).map(Element(_, Left))

    def loop(v: Seq[Element]): LazyList[Seq[Element]] = {
      johnsonTrotter(v) match {
        case Some(s) => v #:: loop(s)
        case None => LazyList(v)
      }
    }
    loop(start).map(_.map(_.i))
  }

  def checkIfMobile(seq: Seq[Element], i: Int): Boolean = {
    val e = seq(i)

    def getAdjacent(s: Seq[Element], d: Direction, j: Int): Int = {
      val adjacentIndex = d match {
        case Left => j - 1
        case Right => j + 1
      }
      s(adjacentIndex).i
    }

    if (e.direction == Left && i == 0) false
    else if (e.direction == Right && i == seq.size - 1) false
    else if (getAdjacent(seq, e.direction, i) < e.i) true
    else false
  }

  def findLargestMobile(seq: Seq[Element]): Option[Int] = {
    val mobiles = (0 until seq.size).filter{j => checkIfMobile(seq, j)}
    if (mobiles.isEmpty) None
    else {
      val folded = mobiles.map(x=>(x,seq(x).i)).foldLeft(None: Option[(Int, Int)]){ case (acc, elem) =>
        acc match {
          case None => Some(elem)
          case Some((i, value)) => if (value > elem._2) Some((i, value)) else Some(elem)
        }
      }
      folded.map(_._1)
    }
  }

  def swapLargestMobile(seq: Seq[Element], index: Int): (Seq[Element], Int) = {
    val dir = seq(index).direction
    val value = seq(index).i
    dir match {
      case Right =>
        val folded = seq.foldLeft((None, Seq()): (Option[Element], Seq[Element])){(acc, elem) =>
          val matched = elem.i == value
          val newAccOpt = if (matched) Some(elem) else None
          val newAccSeq = acc._1 match {
            case Some(swapMe) => acc._2 :+ elem :+ swapMe
            case None => if (matched) acc._2 else acc._2 :+ elem
          }
          (newAccOpt, newAccSeq)
        }
        (folded._2, index + 1)
      case Left =>
        val folded = seq.foldRight((None, Seq()): (Option[Element], Seq[Element])){(elem, acc) =>
          val matched = elem.i == value
          val newAccOpt = if (matched) Some(elem) else None
          val newAccSeq = acc._1 match {
            case Some(swapMe) => swapMe +: elem +: acc._2
            case None => if (matched) acc._2 else elem +: acc._2
          }
          (newAccOpt, newAccSeq)
        }
        (folded._2, index - 1)
    }
  }

  def revDirLargerThanMobile(seq: Seq[Element], mobile: Int) = {
    def reverse(e: Element) = {
      e.direction match {
        case Left => Element(e.i, Right)
        case Right => Element(e.i, Left)
      }
    }
    seq.map{ elem =>
      if (elem.i > seq(mobile).i) reverse(elem)
      else elem
    }
  }

  def johnsonTrotter(curr: Seq[Element]): Option[Seq[Element]] = {
    findLargestMobile(curr).map { m =>
      val (swapped, newMobile) = swapLargestMobile(curr, m)
      revDirLargerThanMobile(swapped, newMobile)
    }
  }

  trait Direction
  case object Left extends Direction
  case object Right extends Direction

  case class Element(i: Int, direction: Direction)
}