使用 Akka HTTP 测试 Akka Persistence - 日志未清除
Testing Akka Persistence with Akka HTTP - journal is not cleared
我尝试将 Akka Persistence Test Kit 与 Akka HTTP Test Kit 一起使用,但我的内存日志在每次测试前都没有被清除。
非常简单的持久性行为 - 只需放入字符串并获取所有存储的字符串:
object MyStore {
def apply(): Behavior[Command] = EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId.ofUniqueId("myId"),
emptyState = State(),
commandHandler = (state, command) => handleCommand(state, command),
eventHandler = (state, event) => handleEvent(state, event)
)
sealed trait Command
case class AddCmd(s: String, replyTo: ActorRef[List[String]]) extends Command
case class ReadCmd(replyTo: ActorRef[List[String]]) extends Command
sealed trait Event
case class AddEvent(s: String) extends Event
case class State(values: List[String] = List())
def handleCommand(state: State, command: Command): ReplyEffect[Event, State] = command match {
case AddCmd(s, replyTo) => Effect.persist(AddEvent(s)).thenReply(replyTo)(updatedState => updatedState.values)
case ReadCmd(replyTo) => Effect.reply(replyTo)(state.values)
}
def handleEvent(state: State, event: Event): State = event match {
case AddEvent(s) => state.copy(values = s :: state.values)
}
}
具有持久性和序列化配置的 Actor 系统配置:
object MySpec {
val configuration: Config = {
val serializationConfigString = "akka.actor.allow-java-serialization = on"
val serialization = ConfigFactory.parseString(serializationConfigString).resolve()
val persistence = PersistenceTestKitPlugin.config
serialization.withFallback(persistence)
}
}
我的测试class:
class MySpec extends AnyFunSuite with Matchers with ScalatestRouteTest with BeforeAndAfterEach {
import MyStore._
import akka.http.scaladsl.server.Directives._
val persistenceTestKit: PersistenceTestKit = PersistenceTestKit(system)
val route: Route = {
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.adapter._
implicit val typedSystem: ActorSystem[Nothing] = system.toTyped
implicit val timeout: Timeout = 3.seconds
val actor: ActorRef[Command] =
system.spawn(behavior = MyStore(), name = "MyStore", props = Props.empty)
get {
val result = actor.ask(replyTo => ReadCmd(replyTo)).map(_.mkString(";"))
complete(result)
} ~ (post & entity(as[String])) { newRecord =>
val result = actor.ask(replyTo => AddCmd(newRecord, replyTo)).map(_ => "OK")
complete(result)
}
}
override def createActorSystem(): akka.actor.ActorSystem =
akka.actor.ActorSystem("MySystem", MySpec.configuration)
override def beforeEach(): Unit = {
persistenceTestKit.clearAll()
}
private def add(s: String) = {
Post("/", s) ~> route ~> check {
responseAs[String] shouldEqual "OK"
}
}
test("Add two elements") {
add("One")
add("Two")
Get() ~> route ~> check {
responseAs[String] shouldEqual "Two;One"
}
}
test("Add another two element") {
add("Three")
add("Four")
Get() ~> route ~> check {
responseAs[String] shouldEqual "Four;Three"
}
}
}
如果我 运行 每个单独测试它工作。但是如果我 运行 两个测试一个接一个我在第二个测试中得到:
Expected :"Four;Three[]"
Actual :"Four;Three[;Two;One]"
我的 build.sbt 文件:
name := "persistence-http-test"
version := "0.1"
scalaVersion := "2.13.6"
val AkkaVersion = "2.6.14"
val AkkaHttpVersion = "10.2.4"
val ScalatestVersion = "3.2.5"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion,
"com.typesafe.akka" %% "akka-stream-typed" % AkkaVersion,
"com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
"com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion,
"org.scalatest" %% "scalatest" % ScalatestVersion,
"com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion,
"com.typesafe.akka" %% "akka-http-testkit" % AkkaHttpVersion,
"com.typesafe.akka" %% "akka-actor-testkit-typed" % AkkaVersion,
"com.typesafe.akka" %% "akka-persistence-testkit" % AkkaVersion
)
存储库:https://github.com/LukBed/akka-persistence-http-test-issue
通过在每次测试前执行persistenceTestKit.clearAll()
,持久性存储中的所有数据都将被删除,但 MyStore actor 的内存状态内容保持不变——因此后续测试失败。
另一个后果是持久性存储将与参与者的状态不同步。为了数据的一致性,最好提供一些类似于 Add
/Read
处理方式的 Clear
-command/event 处理例程:
object MyStore {
// ...
sealed trait Command
case class AddCmd(s: String, replyTo: ActorRef[List[String]]) extends Command
case class ReadCmd(replyTo: ActorRef[List[String]]) extends Command
case class ClearCmd(replyTo: ActorRef[List[String]]) extends Command
sealed trait Event
case class AddEvent(s: String) extends Event
case object ClearEvent extends Event
case class State(values: List[String] = Nil)
def handleCommand(state: State, command: Command): ReplyEffect[Event, State] = command match {
case AddCmd(s, replyTo) => Effect.persist(AddEvent(s)).thenReply(replyTo)(_.values)
case ReadCmd(replyTo) => Effect.reply(replyTo)(state.values)
case ClearCmd(replyTo) => Effect.persist(ClearEvent).thenReply(replyTo)(_.values)
}
def handleEvent(state: State, event: Event): State = event match {
case AddEvent(s) => state.copy(values = s :: state.values)
case ClearEvent => state.copy(values = Nil)
}
}
您现在可以通过 put
使用 route ~> check
让 Clear
command/event 处理程序在之前清除 actor 内部状态和持久性日志每次测试:
class MySpec extends AnyFunSuite with Matchers with ScalatestRouteTest with BeforeAndAfterEach {
// ...
val route: Route = {
...
get {
val result = actor.ask(replyTo => ReadCmd(replyTo)).map(_.mkString(";"))
complete(result)
} ~
put {
val result = actor.ask(replyTo => ClearCmd(replyTo)).map(_.mkString(";"))
complete(result)
} ~
post { entity(as[String]) { newRecord =>
val result = actor.ask(replyTo => AddCmd(newRecord, replyTo)).map(_ => "OK")
complete(result)
} }
}
// ...
override def beforeEach(): Unit = {
initStateAndJournal()
}
private def initStateAndJournal() = {
Put("/", "clear") ~> route ~> check {
responseAs[String] shouldEqual ""
}
}
// ...
}
我尝试将 Akka Persistence Test Kit 与 Akka HTTP Test Kit 一起使用,但我的内存日志在每次测试前都没有被清除。
非常简单的持久性行为 - 只需放入字符串并获取所有存储的字符串:
object MyStore {
def apply(): Behavior[Command] = EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId.ofUniqueId("myId"),
emptyState = State(),
commandHandler = (state, command) => handleCommand(state, command),
eventHandler = (state, event) => handleEvent(state, event)
)
sealed trait Command
case class AddCmd(s: String, replyTo: ActorRef[List[String]]) extends Command
case class ReadCmd(replyTo: ActorRef[List[String]]) extends Command
sealed trait Event
case class AddEvent(s: String) extends Event
case class State(values: List[String] = List())
def handleCommand(state: State, command: Command): ReplyEffect[Event, State] = command match {
case AddCmd(s, replyTo) => Effect.persist(AddEvent(s)).thenReply(replyTo)(updatedState => updatedState.values)
case ReadCmd(replyTo) => Effect.reply(replyTo)(state.values)
}
def handleEvent(state: State, event: Event): State = event match {
case AddEvent(s) => state.copy(values = s :: state.values)
}
}
具有持久性和序列化配置的 Actor 系统配置:
object MySpec {
val configuration: Config = {
val serializationConfigString = "akka.actor.allow-java-serialization = on"
val serialization = ConfigFactory.parseString(serializationConfigString).resolve()
val persistence = PersistenceTestKitPlugin.config
serialization.withFallback(persistence)
}
}
我的测试class:
class MySpec extends AnyFunSuite with Matchers with ScalatestRouteTest with BeforeAndAfterEach {
import MyStore._
import akka.http.scaladsl.server.Directives._
val persistenceTestKit: PersistenceTestKit = PersistenceTestKit(system)
val route: Route = {
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.adapter._
implicit val typedSystem: ActorSystem[Nothing] = system.toTyped
implicit val timeout: Timeout = 3.seconds
val actor: ActorRef[Command] =
system.spawn(behavior = MyStore(), name = "MyStore", props = Props.empty)
get {
val result = actor.ask(replyTo => ReadCmd(replyTo)).map(_.mkString(";"))
complete(result)
} ~ (post & entity(as[String])) { newRecord =>
val result = actor.ask(replyTo => AddCmd(newRecord, replyTo)).map(_ => "OK")
complete(result)
}
}
override def createActorSystem(): akka.actor.ActorSystem =
akka.actor.ActorSystem("MySystem", MySpec.configuration)
override def beforeEach(): Unit = {
persistenceTestKit.clearAll()
}
private def add(s: String) = {
Post("/", s) ~> route ~> check {
responseAs[String] shouldEqual "OK"
}
}
test("Add two elements") {
add("One")
add("Two")
Get() ~> route ~> check {
responseAs[String] shouldEqual "Two;One"
}
}
test("Add another two element") {
add("Three")
add("Four")
Get() ~> route ~> check {
responseAs[String] shouldEqual "Four;Three"
}
}
}
如果我 运行 每个单独测试它工作。但是如果我 运行 两个测试一个接一个我在第二个测试中得到:
Expected :"Four;Three[]"
Actual :"Four;Three[;Two;One]"
我的 build.sbt 文件:
name := "persistence-http-test"
version := "0.1"
scalaVersion := "2.13.6"
val AkkaVersion = "2.6.14"
val AkkaHttpVersion = "10.2.4"
val ScalatestVersion = "3.2.5"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion,
"com.typesafe.akka" %% "akka-stream-typed" % AkkaVersion,
"com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
"com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion,
"org.scalatest" %% "scalatest" % ScalatestVersion,
"com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion,
"com.typesafe.akka" %% "akka-http-testkit" % AkkaHttpVersion,
"com.typesafe.akka" %% "akka-actor-testkit-typed" % AkkaVersion,
"com.typesafe.akka" %% "akka-persistence-testkit" % AkkaVersion
)
存储库:https://github.com/LukBed/akka-persistence-http-test-issue
通过在每次测试前执行persistenceTestKit.clearAll()
,持久性存储中的所有数据都将被删除,但 MyStore actor 的内存状态内容保持不变——因此后续测试失败。
另一个后果是持久性存储将与参与者的状态不同步。为了数据的一致性,最好提供一些类似于 Add
/Read
处理方式的 Clear
-command/event 处理例程:
object MyStore {
// ...
sealed trait Command
case class AddCmd(s: String, replyTo: ActorRef[List[String]]) extends Command
case class ReadCmd(replyTo: ActorRef[List[String]]) extends Command
case class ClearCmd(replyTo: ActorRef[List[String]]) extends Command
sealed trait Event
case class AddEvent(s: String) extends Event
case object ClearEvent extends Event
case class State(values: List[String] = Nil)
def handleCommand(state: State, command: Command): ReplyEffect[Event, State] = command match {
case AddCmd(s, replyTo) => Effect.persist(AddEvent(s)).thenReply(replyTo)(_.values)
case ReadCmd(replyTo) => Effect.reply(replyTo)(state.values)
case ClearCmd(replyTo) => Effect.persist(ClearEvent).thenReply(replyTo)(_.values)
}
def handleEvent(state: State, event: Event): State = event match {
case AddEvent(s) => state.copy(values = s :: state.values)
case ClearEvent => state.copy(values = Nil)
}
}
您现在可以通过 put
使用 route ~> check
让 Clear
command/event 处理程序在之前清除 actor 内部状态和持久性日志每次测试:
class MySpec extends AnyFunSuite with Matchers with ScalatestRouteTest with BeforeAndAfterEach {
// ...
val route: Route = {
...
get {
val result = actor.ask(replyTo => ReadCmd(replyTo)).map(_.mkString(";"))
complete(result)
} ~
put {
val result = actor.ask(replyTo => ClearCmd(replyTo)).map(_.mkString(";"))
complete(result)
} ~
post { entity(as[String]) { newRecord =>
val result = actor.ask(replyTo => AddCmd(newRecord, replyTo)).map(_ => "OK")
complete(result)
} }
}
// ...
override def beforeEach(): Unit = {
initStateAndJournal()
}
private def initStateAndJournal() = {
Put("/", "clear") ~> route ~> check {
responseAs[String] shouldEqual ""
}
}
// ...
}