将 List[A] 转换为 List[B] 并展平的 Flow,给定另一个将 A 转换为 List[B] 的 Flow
Flow which transforms List[A] into List[B] with flattening, given another Flow which transforms A to List[B]
我有一个 "inner" 流程,可以将 A
转换为 List[B]
。我想创建一个流程,通过在每个 List[A]
的每个元素上调用 "inner" 流程,然后将结果展平,从而将 List[A]
转换为 List[B]
。
说明请看下面的测试用例(依赖是最新的scalatest
和akka-stream
,A
这里是String
,B
这里是 Char
,"inner" 流是 stringToCharacters
)。
测试通过,但实现不是惯用的 Akka Streams 代码,因为它具体化/运行子流。
请提出一个更好的、不涉及 运行 子流的惯用实现。
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Sink, Source}
import org.scalatest.FlatSpec
import org.scalatest.Matchers._
import org.scalatest.concurrent.ScalaFutures.whenReady
import scala.concurrent.ExecutionContext.Implicits.global
class TestSpec extends FlatSpec {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
/* This is just for illustration and testing purposes, `flattenFlows` should support various inner flows */
val stringToCharacters: Flow[String, List[Char], NotUsed] =
Flow[String].map(x => x.toList)
/* FIXME: I'm looking for a better implementation of the following function, please don't change the signature */
def flattenFlows[A, B](innerFlow: Flow[A, List[B], NotUsed]): Flow[List[A], List[B], NotUsed] = {
Flow[List[A]]
.mapAsync(Int.MaxValue) { x =>
Source(x).via(innerFlow).runWith(Sink.seq).map(_.flatten.toList)
}
}
it should "flatten flows" in {
val input = List(
List("ab", "cd"),
List("ef", "gh")
)
val result = Source(input).via(flattenFlows(stringToCharacters)).runWith(Sink.seq)
val expected = List(
List('a', 'b', 'c', 'd'),
List('e', 'f', 'g', 'h')
)
whenReady(result) { x =>
x.toList shouldEqual expected
}
}
}
您可以结合使用 flatMapConcat
和 fold
def flattenFlows[A, B](innerFlow: Flow[A, List[B], NotUsed]): Flow[List[A], List[B], NotUsed] =
Flow[List[A]].flatMapConcat{ x => Source(x).via(innerFlow).fold(List.empty[B])(_ ::: _)}
我有一个 "inner" 流程,可以将 A
转换为 List[B]
。我想创建一个流程,通过在每个 List[A]
的每个元素上调用 "inner" 流程,然后将结果展平,从而将 List[A]
转换为 List[B]
。
说明请看下面的测试用例(依赖是最新的scalatest
和akka-stream
,A
这里是String
,B
这里是 Char
,"inner" 流是 stringToCharacters
)。
测试通过,但实现不是惯用的 Akka Streams 代码,因为它具体化/运行子流。
请提出一个更好的、不涉及 运行 子流的惯用实现。
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Sink, Source}
import org.scalatest.FlatSpec
import org.scalatest.Matchers._
import org.scalatest.concurrent.ScalaFutures.whenReady
import scala.concurrent.ExecutionContext.Implicits.global
class TestSpec extends FlatSpec {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
/* This is just for illustration and testing purposes, `flattenFlows` should support various inner flows */
val stringToCharacters: Flow[String, List[Char], NotUsed] =
Flow[String].map(x => x.toList)
/* FIXME: I'm looking for a better implementation of the following function, please don't change the signature */
def flattenFlows[A, B](innerFlow: Flow[A, List[B], NotUsed]): Flow[List[A], List[B], NotUsed] = {
Flow[List[A]]
.mapAsync(Int.MaxValue) { x =>
Source(x).via(innerFlow).runWith(Sink.seq).map(_.flatten.toList)
}
}
it should "flatten flows" in {
val input = List(
List("ab", "cd"),
List("ef", "gh")
)
val result = Source(input).via(flattenFlows(stringToCharacters)).runWith(Sink.seq)
val expected = List(
List('a', 'b', 'c', 'd'),
List('e', 'f', 'g', 'h')
)
whenReady(result) { x =>
x.toList shouldEqual expected
}
}
}
您可以结合使用 flatMapConcat
和 fold
def flattenFlows[A, B](innerFlow: Flow[A, List[B], NotUsed]): Flow[List[A], List[B], NotUsed] =
Flow[List[A]].flatMapConcat{ x => Source(x).via(innerFlow).fold(List.empty[B])(_ ::: _)}