Sink inside Actor 在生产期间调用,但在测试期间不调用
Sink inside Actor invoked during prod, but not during test
我有一个调用流的演员。在 运行 时,这会按预期工作,但在测试时不会调用流。
演员(缩写)
class PaymentProcessorActor(repo: PaymentRepo, accountCache: AccountCache, config: AppConfig) extends Actor {
implicit private val materializer: ActorMaterializer = ActorMaterializer()
implicit private val network: Network = config.network
private implicit val ec: ExecutionContextExecutor = context.dispatcher
val paymentSink: Sink[(Seq[Payment], Account), NotUsed] =
Flow[(Seq[Payment], Account)].map { case (ps, account) =>
println("inside flow")
// ... block of type Future[(TransactionResponse, Seq[Payment], Account)] here
}
.mapAsync(parallelism = config.accounts.size)(_.map {
case ((_: TransactionApproved, ps), account) =>
// handle approval
case ((x: TransactionRejected, ps), account) =>
// handle rejection
})
.to(Sink.ignore)
override def receive: Receive = state(nextKnownPaymentDate = None)
private def state(nextKnownPaymentDate: Option[ZonedDateTime]): Receive =
processPayments(nextKnownPaymentDate) orElse
updateNextPaymentTime orElse
confirmPayments orElse
rejectPayments orElse
rejectTransaction orElse
retryPayments orElse
updateAccount orElse
registerAccount
// If there are payments due, find and pay them
def processPayments(nextKnownPaymentDate: Option[ZonedDateTime]): PartialFunction[Any, Unit] = {
case ProcessPayments if nextKnownPaymentDate.exists(_.isBefore(ZonedDateTime.now())) =>
val readyAccounts = accountCache.readyCount
if (readyAccounts > 0) {
val payments = repo.due(readyAccounts * 100)
if (payments.isEmpty) {
logger.debug("No more payments due.")
context.become(state(repo.earliestTimeDue))
} else {
val submittingPaymentsWithAccounts: Seq[(Seq[Payment], Account)] =
payments.grouped(100).flatMap(ps => accountCache.borrowAccount.map(ps -> _)).toSeq
val submittingPayments: Seq[Payment] = submittingPaymentsWithAccounts.flatMap(_._1)
repo.submit(submittingPayments.flatMap(_.id), ZonedDateTime.now)
Source.fromIterator(() => submittingPaymentsWithAccounts.iterator).to(paymentSink).run()
println("post source run")
}
}
}
规范。 (sampleOf
只是创建了一个随机实例,与问题无关)。
"the payment sink" should {
"submit to the network" in {
val (network, conf, repo, cache) = setup
val account = sampleOf(genAccount)
val payments = sampleOf(Gen.listOfN(3, genPayment))
when(repo.earliestTimeDue).thenReturn(Some(ZonedDateTime.now()))
when(repo.due(100)).thenReturn(payments)
val actor = system.actorOf(Props(new PaymentProcessorActor(repo, cache, conf)))
// these two calls set up the actor state so that payments will be processed
actor ! UpdateNextPaymentTime
actor ! UpdateAccount(account)
// this invokes the stream under test
actor ! ProcessPayments
eventually(timeout(5 seconds)) {
assert(network.posted.size == 1)
}
}
}
private def setup: (StubNetwork, AppConfig, PaymentRepo, AccountCache) = {
val n = StubNetwork()
val conf = new AppConfig {
val network: Network = n
val accounts: Map[String, KeyPair] = Map.empty
}
val repo = mock[PaymentRepo]
(n, conf, repo, new AccountCache)
}
在 运行 时,我看到标准输出消息:
post source run
inside flow
但在测试期间我只看到
post source run
通过调试,我看到所有值都是正确的并且调用了源 .run
。但不知何故它没有 运行.
在行 .mapAsync(parallelism = config.accounts.size)
中,值为零,这是一个错误条件。 Flow
从未初始化。此失败不会传播到主线程。
此外,我在配置中关闭了用于测试的 Akka 日志记录,因此未记录此失败。
我有一个调用流的演员。在 运行 时,这会按预期工作,但在测试时不会调用流。
演员(缩写)
class PaymentProcessorActor(repo: PaymentRepo, accountCache: AccountCache, config: AppConfig) extends Actor {
implicit private val materializer: ActorMaterializer = ActorMaterializer()
implicit private val network: Network = config.network
private implicit val ec: ExecutionContextExecutor = context.dispatcher
val paymentSink: Sink[(Seq[Payment], Account), NotUsed] =
Flow[(Seq[Payment], Account)].map { case (ps, account) =>
println("inside flow")
// ... block of type Future[(TransactionResponse, Seq[Payment], Account)] here
}
.mapAsync(parallelism = config.accounts.size)(_.map {
case ((_: TransactionApproved, ps), account) =>
// handle approval
case ((x: TransactionRejected, ps), account) =>
// handle rejection
})
.to(Sink.ignore)
override def receive: Receive = state(nextKnownPaymentDate = None)
private def state(nextKnownPaymentDate: Option[ZonedDateTime]): Receive =
processPayments(nextKnownPaymentDate) orElse
updateNextPaymentTime orElse
confirmPayments orElse
rejectPayments orElse
rejectTransaction orElse
retryPayments orElse
updateAccount orElse
registerAccount
// If there are payments due, find and pay them
def processPayments(nextKnownPaymentDate: Option[ZonedDateTime]): PartialFunction[Any, Unit] = {
case ProcessPayments if nextKnownPaymentDate.exists(_.isBefore(ZonedDateTime.now())) =>
val readyAccounts = accountCache.readyCount
if (readyAccounts > 0) {
val payments = repo.due(readyAccounts * 100)
if (payments.isEmpty) {
logger.debug("No more payments due.")
context.become(state(repo.earliestTimeDue))
} else {
val submittingPaymentsWithAccounts: Seq[(Seq[Payment], Account)] =
payments.grouped(100).flatMap(ps => accountCache.borrowAccount.map(ps -> _)).toSeq
val submittingPayments: Seq[Payment] = submittingPaymentsWithAccounts.flatMap(_._1)
repo.submit(submittingPayments.flatMap(_.id), ZonedDateTime.now)
Source.fromIterator(() => submittingPaymentsWithAccounts.iterator).to(paymentSink).run()
println("post source run")
}
}
}
规范。 (sampleOf
只是创建了一个随机实例,与问题无关)。
"the payment sink" should {
"submit to the network" in {
val (network, conf, repo, cache) = setup
val account = sampleOf(genAccount)
val payments = sampleOf(Gen.listOfN(3, genPayment))
when(repo.earliestTimeDue).thenReturn(Some(ZonedDateTime.now()))
when(repo.due(100)).thenReturn(payments)
val actor = system.actorOf(Props(new PaymentProcessorActor(repo, cache, conf)))
// these two calls set up the actor state so that payments will be processed
actor ! UpdateNextPaymentTime
actor ! UpdateAccount(account)
// this invokes the stream under test
actor ! ProcessPayments
eventually(timeout(5 seconds)) {
assert(network.posted.size == 1)
}
}
}
private def setup: (StubNetwork, AppConfig, PaymentRepo, AccountCache) = {
val n = StubNetwork()
val conf = new AppConfig {
val network: Network = n
val accounts: Map[String, KeyPair] = Map.empty
}
val repo = mock[PaymentRepo]
(n, conf, repo, new AccountCache)
}
在 运行 时,我看到标准输出消息:
post source run
inside flow
但在测试期间我只看到
post source run
通过调试,我看到所有值都是正确的并且调用了源 .run
。但不知何故它没有 运行.
在行 .mapAsync(parallelism = config.accounts.size)
中,值为零,这是一个错误条件。 Flow
从未初始化。此失败不会传播到主线程。
此外,我在配置中关闭了用于测试的 Akka 日志记录,因此未记录此失败。