如何创建递归生成项目的 Akka Stream Source
How to create an Akka Stream Source that generates items recursively
我正在尝试弄清楚如何创建生成许多 Seq[Int] 的 Akka Streams 源。
基本上,给定一个 int n
我想生成 1 到 n
的所有 Seq[Int]
下面是执行此操作的一些代码:
def combinations(n: Int): Seq[Seq[Int]] = {
def loop(acc: (Seq[Int], Seq[Seq[Int]]),
remaining: Seq[Int]): Seq[Seq[Int]] = {
remaining match {
case s if s.size == 1 => {
val total: Seq[Seq[Int]] = acc._2
val current: Seq[Int] = acc._1
total :+ (current :+ s.head)
}
case _ => {
for {
x <- remaining
comb <- loop((acc._1 :+ x, acc._2), remaining.filter(_ != x))
} yield comb
}
}
}
loop((Seq(), Seq()), (1 to n))
}
这在 10 以内都可以正常工作...然后由于内存不足而崩溃。因为我只想处理它们中的每一个而不需要将它们全部保存在内存中,所以我想...... Akka Streams。但是我不知道如何将它变成一个产生每个组合的源,以便我可以处理它们。基本上在它附加到总计的地方我会在流中生成另一个项目。
这是一个使用 Johnson-Trotter 算法进行排列的解决方案。 tcopermutations
创建一个可以根据需要进行评估的 LazyList。如需更多排列,只需将不同的值传递给 printNIterations
.
之所以使用Johnson-Trotter算法,是因为它打破了排列查找算法的递归结构。这对于能够评估排列的连续实例并将它们存储在某种惰性列表或流中很重要。
object PermutationsTest {
def main(args: Array[String]) = {
printNIterations(50, tcopermutations(5).iterator)
}
def printNIterations(n: Int, it: Iterator[Seq[Int]]): Unit = {
if (n<=0) ()
else {
if (it.hasNext) {
println(it.next())
printNIterations(n - 1, it)
} else ()
}
}
def naivepermutations(n: Int): Seq[Seq[Int]] = {
def loop(acc: Seq[Int], remaining: Seq[Int]): Seq[Seq[Int]] = {
remaining match {
case s if s.size == 1 => {
val current: Seq[Int] = acc
Seq((current :+ s.head))
}
case _ => {
for {
x <- remaining
comb <- loop(acc :+ x, remaining.filter(_ != x))
} yield comb
}
}
}
loop(Seq(), (1 to n))
}
def tcopermutations(n: Int): LazyList[Seq[Int]] = {
val start = (1 to n).map(Element(_, Left))
def loop(v: Seq[Element]): LazyList[Seq[Element]] = {
johnsonTrotter(v) match {
case Some(s) => v #:: loop(s)
case None => LazyList(v)
}
}
loop(start).map(_.map(_.i))
}
def checkIfMobile(seq: Seq[Element], i: Int): Boolean = {
val e = seq(i)
def getAdjacent(s: Seq[Element], d: Direction, j: Int): Int = {
val adjacentIndex = d match {
case Left => j - 1
case Right => j + 1
}
s(adjacentIndex).i
}
if (e.direction == Left && i == 0) false
else if (e.direction == Right && i == seq.size - 1) false
else if (getAdjacent(seq, e.direction, i) < e.i) true
else false
}
def findLargestMobile(seq: Seq[Element]): Option[Int] = {
val mobiles = (0 until seq.size).filter{j => checkIfMobile(seq, j)}
if (mobiles.isEmpty) None
else {
val folded = mobiles.map(x=>(x,seq(x).i)).foldLeft(None: Option[(Int, Int)]){ case (acc, elem) =>
acc match {
case None => Some(elem)
case Some((i, value)) => if (value > elem._2) Some((i, value)) else Some(elem)
}
}
folded.map(_._1)
}
}
def swapLargestMobile(seq: Seq[Element], index: Int): (Seq[Element], Int) = {
val dir = seq(index).direction
val value = seq(index).i
dir match {
case Right =>
val folded = seq.foldLeft((None, Seq()): (Option[Element], Seq[Element])){(acc, elem) =>
val matched = elem.i == value
val newAccOpt = if (matched) Some(elem) else None
val newAccSeq = acc._1 match {
case Some(swapMe) => acc._2 :+ elem :+ swapMe
case None => if (matched) acc._2 else acc._2 :+ elem
}
(newAccOpt, newAccSeq)
}
(folded._2, index + 1)
case Left =>
val folded = seq.foldRight((None, Seq()): (Option[Element], Seq[Element])){(elem, acc) =>
val matched = elem.i == value
val newAccOpt = if (matched) Some(elem) else None
val newAccSeq = acc._1 match {
case Some(swapMe) => swapMe +: elem +: acc._2
case None => if (matched) acc._2 else elem +: acc._2
}
(newAccOpt, newAccSeq)
}
(folded._2, index - 1)
}
}
def revDirLargerThanMobile(seq: Seq[Element], mobile: Int) = {
def reverse(e: Element) = {
e.direction match {
case Left => Element(e.i, Right)
case Right => Element(e.i, Left)
}
}
seq.map{ elem =>
if (elem.i > seq(mobile).i) reverse(elem)
else elem
}
}
def johnsonTrotter(curr: Seq[Element]): Option[Seq[Element]] = {
findLargestMobile(curr).map { m =>
val (swapped, newMobile) = swapLargestMobile(curr, m)
revDirLargerThanMobile(swapped, newMobile)
}
}
trait Direction
case object Left extends Direction
case object Right extends Direction
case class Element(i: Int, direction: Direction)
}
我正在尝试弄清楚如何创建生成许多 Seq[Int] 的 Akka Streams 源。
基本上,给定一个 int n
我想生成 1 到 n
Seq[Int]
下面是执行此操作的一些代码:
def combinations(n: Int): Seq[Seq[Int]] = {
def loop(acc: (Seq[Int], Seq[Seq[Int]]),
remaining: Seq[Int]): Seq[Seq[Int]] = {
remaining match {
case s if s.size == 1 => {
val total: Seq[Seq[Int]] = acc._2
val current: Seq[Int] = acc._1
total :+ (current :+ s.head)
}
case _ => {
for {
x <- remaining
comb <- loop((acc._1 :+ x, acc._2), remaining.filter(_ != x))
} yield comb
}
}
}
loop((Seq(), Seq()), (1 to n))
}
这在 10 以内都可以正常工作...然后由于内存不足而崩溃。因为我只想处理它们中的每一个而不需要将它们全部保存在内存中,所以我想...... Akka Streams。但是我不知道如何将它变成一个产生每个组合的源,以便我可以处理它们。基本上在它附加到总计的地方我会在流中生成另一个项目。
这是一个使用 Johnson-Trotter 算法进行排列的解决方案。 tcopermutations
创建一个可以根据需要进行评估的 LazyList。如需更多排列,只需将不同的值传递给 printNIterations
.
之所以使用Johnson-Trotter算法,是因为它打破了排列查找算法的递归结构。这对于能够评估排列的连续实例并将它们存储在某种惰性列表或流中很重要。
object PermutationsTest {
def main(args: Array[String]) = {
printNIterations(50, tcopermutations(5).iterator)
}
def printNIterations(n: Int, it: Iterator[Seq[Int]]): Unit = {
if (n<=0) ()
else {
if (it.hasNext) {
println(it.next())
printNIterations(n - 1, it)
} else ()
}
}
def naivepermutations(n: Int): Seq[Seq[Int]] = {
def loop(acc: Seq[Int], remaining: Seq[Int]): Seq[Seq[Int]] = {
remaining match {
case s if s.size == 1 => {
val current: Seq[Int] = acc
Seq((current :+ s.head))
}
case _ => {
for {
x <- remaining
comb <- loop(acc :+ x, remaining.filter(_ != x))
} yield comb
}
}
}
loop(Seq(), (1 to n))
}
def tcopermutations(n: Int): LazyList[Seq[Int]] = {
val start = (1 to n).map(Element(_, Left))
def loop(v: Seq[Element]): LazyList[Seq[Element]] = {
johnsonTrotter(v) match {
case Some(s) => v #:: loop(s)
case None => LazyList(v)
}
}
loop(start).map(_.map(_.i))
}
def checkIfMobile(seq: Seq[Element], i: Int): Boolean = {
val e = seq(i)
def getAdjacent(s: Seq[Element], d: Direction, j: Int): Int = {
val adjacentIndex = d match {
case Left => j - 1
case Right => j + 1
}
s(adjacentIndex).i
}
if (e.direction == Left && i == 0) false
else if (e.direction == Right && i == seq.size - 1) false
else if (getAdjacent(seq, e.direction, i) < e.i) true
else false
}
def findLargestMobile(seq: Seq[Element]): Option[Int] = {
val mobiles = (0 until seq.size).filter{j => checkIfMobile(seq, j)}
if (mobiles.isEmpty) None
else {
val folded = mobiles.map(x=>(x,seq(x).i)).foldLeft(None: Option[(Int, Int)]){ case (acc, elem) =>
acc match {
case None => Some(elem)
case Some((i, value)) => if (value > elem._2) Some((i, value)) else Some(elem)
}
}
folded.map(_._1)
}
}
def swapLargestMobile(seq: Seq[Element], index: Int): (Seq[Element], Int) = {
val dir = seq(index).direction
val value = seq(index).i
dir match {
case Right =>
val folded = seq.foldLeft((None, Seq()): (Option[Element], Seq[Element])){(acc, elem) =>
val matched = elem.i == value
val newAccOpt = if (matched) Some(elem) else None
val newAccSeq = acc._1 match {
case Some(swapMe) => acc._2 :+ elem :+ swapMe
case None => if (matched) acc._2 else acc._2 :+ elem
}
(newAccOpt, newAccSeq)
}
(folded._2, index + 1)
case Left =>
val folded = seq.foldRight((None, Seq()): (Option[Element], Seq[Element])){(elem, acc) =>
val matched = elem.i == value
val newAccOpt = if (matched) Some(elem) else None
val newAccSeq = acc._1 match {
case Some(swapMe) => swapMe +: elem +: acc._2
case None => if (matched) acc._2 else elem +: acc._2
}
(newAccOpt, newAccSeq)
}
(folded._2, index - 1)
}
}
def revDirLargerThanMobile(seq: Seq[Element], mobile: Int) = {
def reverse(e: Element) = {
e.direction match {
case Left => Element(e.i, Right)
case Right => Element(e.i, Left)
}
}
seq.map{ elem =>
if (elem.i > seq(mobile).i) reverse(elem)
else elem
}
}
def johnsonTrotter(curr: Seq[Element]): Option[Seq[Element]] = {
findLargestMobile(curr).map { m =>
val (swapped, newMobile) = swapLargestMobile(curr, m)
revDirLargerThanMobile(swapped, newMobile)
}
}
trait Direction
case object Left extends Direction
case object Right extends Direction
case class Element(i: Int, direction: Direction)
}