如何使用封装的源和接收器测试 akka 流闭合形状可运行图
How to test an akka stream closed shape runnable graph with an encapsulated source and sink
我创建了一个 akka 流,其中传递了一个处理函数和一个错误处理函数。 Source
和 Sink
完全封装在 ClosedShape
RunnableFlow
中。我的意图是通过流程将项目传递给父 class 和 运行。在我进行测试之前,这一切似乎都有效。我正在使用 scala-test 并在进程函数和错误处理函数中传递附加到列表。我随机生成错误以查看错误处理程序函数的内容。问题是,如果我将 100 个项目传递给父级 class,那么我希望错误函数中的项目列表和过程函数中的项目列表加起来为 100。因为 Source 和 Sink 完全封装 我没有明确的方法告诉测试等待,它会在所有项目通过流处理之前到达 assertion/should 语句。我创建了 this gist 来描述流。
下面是上述要点的示例测试:
import akka.actor._
import akka.stream._
import akka.testkit._
import org.scalatest._
class TestSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
with WordSpecLike with Matchers with BeforeAndAfterAll {
def this() = this(ActorSystem("TestSpec"))
override def afterAll = {
Thread.sleep(500)
mat.shutdown()
TestKit.shutdownActorSystem(system)
}
implicit val mat = ActorMaterializer(ActorMaterializerSettings(system).withDebugLogging(true).withFuzzing(true))
"TestSpec" must {
"handle messages" in {
val testStream = new Testing() // For Testing class see gist: https://gist.github.com/leftofnull/3e4c2a6b18fe71d219b6
(1 to 100).map(n => testStream.processString(s"${n}${n * 2}${n * 4}${n * 8}")) // Give it 100 strings to chew on
testStream.errors.size should not be (0) // passes
testStream.processed.size should not be (0) // passes
(testStream.processed.size + testStream.errors.size) should be (100) // fails due to checking before all items are processed
}
}
}
根据 Viktor Klang 在链接的 Gist 上的评论。这被证明是一个很好的解决方案:
def consume(
errorHandler: BadData => Unit, fn: Data => Unit, a: String
): RunnableGraph[Future[akka.Done]] = RunnableGraph.fromGraph(
GraphDSL.create(Sink.foreach[BadData](errorHandler)) { implicit b: GraphDSL.Builder[Unit] => sink =>
import GraphDSL.Implicits._
val source = b.add(Source.single(a))
val broadcast = b.add(Broadcast[String](2))
val merge = b.add(Zip[String, String])
val process = new ProcessorFlow(fn)
val failed = b.add(Flow[Xor[BadData, Data]].filter(x => x.isLeft))
val errors = b.add(new LeftFlow[Xor[BadData, Data], BadData](
(input: Xor[BadData, Data]) =>
input.swap.getOrElse((new Throwable, ("", "")))
))
source ~> broadcast.in
broadcast.out(0) ~> Flow[String].map(_.reverse) ~> merge.in0
broadcast.out(1) ~> Flow[String].map("| " + _ + " |") ~> merge.in1
merge.out ~> process ~> failed ~> errors ~> sink
ClosedShape
}
)
这允许我在 RunnableGraph 上 Await.result
进行测试。再次感谢 Viktor 提供此解决方案!
我创建了一个 akka 流,其中传递了一个处理函数和一个错误处理函数。 Source
和 Sink
完全封装在 ClosedShape
RunnableFlow
中。我的意图是通过流程将项目传递给父 class 和 运行。在我进行测试之前,这一切似乎都有效。我正在使用 scala-test 并在进程函数和错误处理函数中传递附加到列表。我随机生成错误以查看错误处理程序函数的内容。问题是,如果我将 100 个项目传递给父级 class,那么我希望错误函数中的项目列表和过程函数中的项目列表加起来为 100。因为 Source 和 Sink 完全封装 我没有明确的方法告诉测试等待,它会在所有项目通过流处理之前到达 assertion/should 语句。我创建了 this gist 来描述流。
下面是上述要点的示例测试:
import akka.actor._
import akka.stream._
import akka.testkit._
import org.scalatest._
class TestSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
with WordSpecLike with Matchers with BeforeAndAfterAll {
def this() = this(ActorSystem("TestSpec"))
override def afterAll = {
Thread.sleep(500)
mat.shutdown()
TestKit.shutdownActorSystem(system)
}
implicit val mat = ActorMaterializer(ActorMaterializerSettings(system).withDebugLogging(true).withFuzzing(true))
"TestSpec" must {
"handle messages" in {
val testStream = new Testing() // For Testing class see gist: https://gist.github.com/leftofnull/3e4c2a6b18fe71d219b6
(1 to 100).map(n => testStream.processString(s"${n}${n * 2}${n * 4}${n * 8}")) // Give it 100 strings to chew on
testStream.errors.size should not be (0) // passes
testStream.processed.size should not be (0) // passes
(testStream.processed.size + testStream.errors.size) should be (100) // fails due to checking before all items are processed
}
}
}
根据 Viktor Klang 在链接的 Gist 上的评论。这被证明是一个很好的解决方案:
def consume(
errorHandler: BadData => Unit, fn: Data => Unit, a: String
): RunnableGraph[Future[akka.Done]] = RunnableGraph.fromGraph(
GraphDSL.create(Sink.foreach[BadData](errorHandler)) { implicit b: GraphDSL.Builder[Unit] => sink =>
import GraphDSL.Implicits._
val source = b.add(Source.single(a))
val broadcast = b.add(Broadcast[String](2))
val merge = b.add(Zip[String, String])
val process = new ProcessorFlow(fn)
val failed = b.add(Flow[Xor[BadData, Data]].filter(x => x.isLeft))
val errors = b.add(new LeftFlow[Xor[BadData, Data], BadData](
(input: Xor[BadData, Data]) =>
input.swap.getOrElse((new Throwable, ("", "")))
))
source ~> broadcast.in
broadcast.out(0) ~> Flow[String].map(_.reverse) ~> merge.in0
broadcast.out(1) ~> Flow[String].map("| " + _ + " |") ~> merge.in1
merge.out ~> process ~> failed ~> errors ~> sink
ClosedShape
}
)
这允许我在 RunnableGraph 上 Await.result
进行测试。再次感谢 Viktor 提供此解决方案!