我应该如何测试 akka-streams RestartingSource 的使用
How should I test akka-streams RestartingSource usage
我正在开发一个应用程序,该应用程序有几个长 运行 流,它订阅有关某个实体的数据并处理该数据。这些流应该 24/7 全天候运行,因此我们需要处理故障(网络问题等)。
为此,我们将源代码封装在 RestartingSource
.
中
我现在正在尝试验证此行为,虽然它看起来可以正常工作,但我正在努力创建一个测试,在其中我推送一些数据,验证它是否正确处理,然后发送错误,然后确认它在那之后重新连接并继续处理。
我把它归结为这个最小的案例:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{RestartSource, Sink, Source}
import akka.stream.testkit.TestPublisher
import org.scalatest.concurrent.Eventually
import org.scalatest.{FlatSpec, Matchers}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
class MinimalSpec extends FlatSpec with Matchers with Eventually {
"restarting a failed source" should "be testable" in {
implicit val sys: ActorSystem = ActorSystem("akka-grpc-measurements-for-test")
implicit val mat: ActorMaterializer = ActorMaterializer()
implicit val ec: ExecutionContext = sys.dispatcher
val probe = TestPublisher.probe[Int]()
val restartingSource = RestartSource
.onFailuresWithBackoff(1 second, 1 minute, 0d) { () => Source.fromPublisher(probe) }
var last: Int = 0
val sink = Sink.foreach { l: Int => last = l }
restartingSource.runWith(sink)
probe.sendNext(1)
eventually {
last shouldBe 1
}
probe.sendNext(2)
eventually {
last shouldBe 2
}
probe.sendError(new RuntimeException("boom"))
probe.expectSubscription()
probe.sendNext(3)
eventually {
last shouldBe 3
}
}
}
此测试在最后一个 eventually
块 Last failure message: 2 was not equal to 3
上始终失败。我在这里错过了什么?
编辑:akka 版本是 2.5.31
我看了 TestPublisher
code 之后就明白了。它的订阅是 lazy val
。所以当RestartSource
检测到错误,再次执行工厂方法() => Source.fromPublisher(probe)
时,得到了一个新的Source
,但是probe
的subscription
仍然指向到旧 Source
。更改代码以初始化新的 Source
和 TestPublisher
都有效。
我正在开发一个应用程序,该应用程序有几个长 运行 流,它订阅有关某个实体的数据并处理该数据。这些流应该 24/7 全天候运行,因此我们需要处理故障(网络问题等)。
为此,我们将源代码封装在 RestartingSource
.
我现在正在尝试验证此行为,虽然它看起来可以正常工作,但我正在努力创建一个测试,在其中我推送一些数据,验证它是否正确处理,然后发送错误,然后确认它在那之后重新连接并继续处理。
我把它归结为这个最小的案例:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{RestartSource, Sink, Source}
import akka.stream.testkit.TestPublisher
import org.scalatest.concurrent.Eventually
import org.scalatest.{FlatSpec, Matchers}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
class MinimalSpec extends FlatSpec with Matchers with Eventually {
"restarting a failed source" should "be testable" in {
implicit val sys: ActorSystem = ActorSystem("akka-grpc-measurements-for-test")
implicit val mat: ActorMaterializer = ActorMaterializer()
implicit val ec: ExecutionContext = sys.dispatcher
val probe = TestPublisher.probe[Int]()
val restartingSource = RestartSource
.onFailuresWithBackoff(1 second, 1 minute, 0d) { () => Source.fromPublisher(probe) }
var last: Int = 0
val sink = Sink.foreach { l: Int => last = l }
restartingSource.runWith(sink)
probe.sendNext(1)
eventually {
last shouldBe 1
}
probe.sendNext(2)
eventually {
last shouldBe 2
}
probe.sendError(new RuntimeException("boom"))
probe.expectSubscription()
probe.sendNext(3)
eventually {
last shouldBe 3
}
}
}
此测试在最后一个 eventually
块 Last failure message: 2 was not equal to 3
上始终失败。我在这里错过了什么?
编辑:akka 版本是 2.5.31
我看了 TestPublisher
code 之后就明白了。它的订阅是 lazy val
。所以当RestartSource
检测到错误,再次执行工厂方法() => Source.fromPublisher(probe)
时,得到了一个新的Source
,但是probe
的subscription
仍然指向到旧 Source
。更改代码以初始化新的 Source
和 TestPublisher
都有效。