为什么在 akka-stream 的单元测试期间会出现此超时?
Why am I getting this timeout during unit test of akka-stream?
我有一个 akka-gRPC
服务双向流,我正在单元测试中测试它。该服务使用 akka-stream
,我使用 TestSink.probe
来测试回复消息。我正在从服务接收回消息,但存在与超时相关的错误,我无法弄清楚是什么原因。这是测试:
object GreeterServiceConf {
// important to enable HTTP/2 in server ActorSystem's config
val configServer = ConfigFactory.parseString("akka.http.server.preview.enable-http2 = on")
.withFallback(ConfigFactory.defaultApplication())
val configString2 =
"""
|akka.grpc.client {
| "helloworld.GreeterService" {
| host = 127.0.0.1
| port = 8080
| }
|}
|""".stripMargin
val configClient = ConfigFactory.parseString(configString2)
}
class GreeterServiceImplSpec extends TestKit(ActorSystem("GreeterServiceImplSpec", ConfigFactory.load(GreeterServiceConf.configServer)))
with AnyWordSpecLike
with BeforeAndAfterAll
with Matchers
with ScalaFutures {
implicit val patience: PatienceConfig = PatienceConfig(scaled(5.seconds), scaled(100.millis))
// val testKit = ActorTestKit(conf)
val serverSystem: ActorSystem = system
val bound = new GreeterServer(serverSystem).run()
// make sure server is bound before using client
bound.futureValue
implicit val clientSystem: ActorSystem = ActorSystem("GreeterClient", ConfigFactory.load(GreeterServiceConf.configClient))
val client = GreeterServiceClient(
GrpcClientSettings
.fromConfig("helloworld.GreeterService")
.withTls(false)
)
override def afterAll: Unit = {
TestKit.shutdownActorSystem(system)
TestKit.shutdownActorSystem(clientSystem)
}
"GreeterService" should {
"reply to multiple requests" in {
import GreeterServiceData._
val names = List("John", "Michael", "Simone")
val expectedReply: immutable.Seq[HelloReply] = names.map { name =>
HelloReply(s"Hello, $name -> ${mapHelloReply.getOrElse(name, "this person does not exist =(")}")
}
val requestStream: Source[HelloRequest, NotUsed] = Source(names).map(name => HelloRequest(name))
val responseStream: Source[HelloReply, NotUsed] = client.sayHelloToAll(requestStream)
val sink = TestSink.probe[HelloReply]
val replyStream = responseStream.runWith(sink)
replyStream
.requestNext(HelloReply(s"Hello, John -> I killed Java"))
.requestNext(HelloReply(s"Hello, Michael -> We are the Jacksons 5"))
.requestNext(HelloReply(s"Hello, Simone -> I have found a job to work with Scala =)")) // THIS IS THE LINE 122 ON THE ERROR
// .request(3)
// .expectNextUnorderedN(expectedReply) // I also tested this but it did not work
.expectComplete()
}
}
}
错误是:
assertion failed: timeout (3 seconds) during expectMsg while waiting
for OnComplete java.lang.AssertionError: assertion failed: timeout (3
seconds) during expectMsg while waiting for OnComplete at
scala.Predef$.assert(Predef.scala:223) at
akka.testkit.TestKitBase.expectMsg_internal(TestKit.scala:459) at
akka.testkit.TestKitBase.expectMsg(TestKit.scala:436) at
akka.testkit.TestKitBase.expectMsg$(TestKit.scala:436) at
akka.testkit.TestKit.expectMsg(TestKit.scala:969) at
akka.stream.testkit.TestSubscriber$ManualProbe.expectComplete(StreamTestKit.scala:479)
at
com.example.helloworld.GreeterServiceImplSpec.$anonfun$new(GreeterServiceImplSpec.scala:121)
我根据项目akka-grpc-quickstart-scala.g8让它工作。我正在执行 runForeach
到 运行 图表,并在响应流上有一个具体化的 Sink
。然后,当响应完成后,我在 Future[Done]
.
中执行 assert
"reply to multiple requests" in {
import GreeterServiceData._
import system.dispatcher
val names = List("John", "Martin", "Michael", "UnknownPerson")
val expectedReplySeq: immutable.Seq[HelloReply] = names.map { name =>
HelloReply(s"Hello, $name -> ${mapHelloReply.getOrElse(name, "this person does not exist =(")}")
}
// println(s"expectedReplySeq: ${expectedReplySeq.foreach(println)}")
val requestStream: Source[HelloRequest, NotUsed] = Source(names).map(name => HelloRequest(name))
val responseStream: Source[HelloReply, NotUsed] = client.sayHelloToAll(requestStream)
val done: Future[Done] = responseStream.runForeach { reply: HelloReply =>
// println(s"got streaming reply: ${reply.message}")
assert(expectedReplySeq.contains(reply))
}
// OR USING Sink.foreach[HelloReply])(Keep.right)
val sinkHelloReply = Sink.foreach[HelloReply] { e =>
println(s"element: $e")
assert(expectedReplySeq.contains(e))
}
responseStream.toMat(sinkHelloReply)(Keep.right).run().onComplete {
case Success(value) => println(s"done")
case Failure(exception) => println(s"exception $exception")
}
}
为了保持整个代码的引用,GreeterServiceImplSpec class is here.
我有一个 akka-gRPC
服务双向流,我正在单元测试中测试它。该服务使用 akka-stream
,我使用 TestSink.probe
来测试回复消息。我正在从服务接收回消息,但存在与超时相关的错误,我无法弄清楚是什么原因。这是测试:
object GreeterServiceConf {
// important to enable HTTP/2 in server ActorSystem's config
val configServer = ConfigFactory.parseString("akka.http.server.preview.enable-http2 = on")
.withFallback(ConfigFactory.defaultApplication())
val configString2 =
"""
|akka.grpc.client {
| "helloworld.GreeterService" {
| host = 127.0.0.1
| port = 8080
| }
|}
|""".stripMargin
val configClient = ConfigFactory.parseString(configString2)
}
class GreeterServiceImplSpec extends TestKit(ActorSystem("GreeterServiceImplSpec", ConfigFactory.load(GreeterServiceConf.configServer)))
with AnyWordSpecLike
with BeforeAndAfterAll
with Matchers
with ScalaFutures {
implicit val patience: PatienceConfig = PatienceConfig(scaled(5.seconds), scaled(100.millis))
// val testKit = ActorTestKit(conf)
val serverSystem: ActorSystem = system
val bound = new GreeterServer(serverSystem).run()
// make sure server is bound before using client
bound.futureValue
implicit val clientSystem: ActorSystem = ActorSystem("GreeterClient", ConfigFactory.load(GreeterServiceConf.configClient))
val client = GreeterServiceClient(
GrpcClientSettings
.fromConfig("helloworld.GreeterService")
.withTls(false)
)
override def afterAll: Unit = {
TestKit.shutdownActorSystem(system)
TestKit.shutdownActorSystem(clientSystem)
}
"GreeterService" should {
"reply to multiple requests" in {
import GreeterServiceData._
val names = List("John", "Michael", "Simone")
val expectedReply: immutable.Seq[HelloReply] = names.map { name =>
HelloReply(s"Hello, $name -> ${mapHelloReply.getOrElse(name, "this person does not exist =(")}")
}
val requestStream: Source[HelloRequest, NotUsed] = Source(names).map(name => HelloRequest(name))
val responseStream: Source[HelloReply, NotUsed] = client.sayHelloToAll(requestStream)
val sink = TestSink.probe[HelloReply]
val replyStream = responseStream.runWith(sink)
replyStream
.requestNext(HelloReply(s"Hello, John -> I killed Java"))
.requestNext(HelloReply(s"Hello, Michael -> We are the Jacksons 5"))
.requestNext(HelloReply(s"Hello, Simone -> I have found a job to work with Scala =)")) // THIS IS THE LINE 122 ON THE ERROR
// .request(3)
// .expectNextUnorderedN(expectedReply) // I also tested this but it did not work
.expectComplete()
}
}
}
错误是:
assertion failed: timeout (3 seconds) during expectMsg while waiting for OnComplete java.lang.AssertionError: assertion failed: timeout (3 seconds) during expectMsg while waiting for OnComplete at scala.Predef$.assert(Predef.scala:223) at akka.testkit.TestKitBase.expectMsg_internal(TestKit.scala:459) at akka.testkit.TestKitBase.expectMsg(TestKit.scala:436) at akka.testkit.TestKitBase.expectMsg$(TestKit.scala:436) at akka.testkit.TestKit.expectMsg(TestKit.scala:969) at akka.stream.testkit.TestSubscriber$ManualProbe.expectComplete(StreamTestKit.scala:479) at com.example.helloworld.GreeterServiceImplSpec.$anonfun$new(GreeterServiceImplSpec.scala:121)
我根据项目akka-grpc-quickstart-scala.g8让它工作。我正在执行 runForeach
到 运行 图表,并在响应流上有一个具体化的 Sink
。然后,当响应完成后,我在 Future[Done]
.
assert
"reply to multiple requests" in {
import GreeterServiceData._
import system.dispatcher
val names = List("John", "Martin", "Michael", "UnknownPerson")
val expectedReplySeq: immutable.Seq[HelloReply] = names.map { name =>
HelloReply(s"Hello, $name -> ${mapHelloReply.getOrElse(name, "this person does not exist =(")}")
}
// println(s"expectedReplySeq: ${expectedReplySeq.foreach(println)}")
val requestStream: Source[HelloRequest, NotUsed] = Source(names).map(name => HelloRequest(name))
val responseStream: Source[HelloReply, NotUsed] = client.sayHelloToAll(requestStream)
val done: Future[Done] = responseStream.runForeach { reply: HelloReply =>
// println(s"got streaming reply: ${reply.message}")
assert(expectedReplySeq.contains(reply))
}
// OR USING Sink.foreach[HelloReply])(Keep.right)
val sinkHelloReply = Sink.foreach[HelloReply] { e =>
println(s"element: $e")
assert(expectedReplySeq.contains(e))
}
responseStream.toMat(sinkHelloReply)(Keep.right).run().onComplete {
case Success(value) => println(s"done")
case Failure(exception) => println(s"exception $exception")
}
}
为了保持整个代码的引用,GreeterServiceImplSpec class is here.