如何正确验证 ZIO Test 中的计划调用
How to correctly verify scheduled invocations in ZIO Test
我是 ZIO 和 ZIO 测试的新手,我想测试我在 ZIO v1.0.0RC17 下编写的调度服务:
import zio.{RIO, Schedule}
import zio.clock.Clock
import zio.duration._
trait ModuleA {
def moduleA: ModuleA.Service
object ModuleA {
trait Service {
def schedule(arg: Int): RIO[Clock, Unit]
trait ModuleALive extends ModuleA {
def moduleB: ModuleB.Service
override def moduleA: ModuleA.Service = new ModuleA.Service {
override def schedule(arg: Int): RIO[Clock, Unit] = {
moduleB.run(arg).repeat(Schedule.spaced(1 day)).map(_ => ())
trait ModuleB {
def moduleB: ModuleB.Service
object ModuleB {
trait Service {
def run(arg: Int): RIO[Clock, Unit]
ModuleA 的服务基本上应该 运行 每天一次 ModuleB 的服务方法,并将参数输入 ModuleA.Service.run。
import java.util.concurrent.atomic.AtomicInteger
import zio.clock.Clock
import zio.duration._
import zio.test.environment.TestClock
import zio.test.{DefaultRunnableSpec, assertCompletes, suite, testM}
import zio.{RIO, Task, ZIO}
object ExampleSpec extends DefaultRunnableSpec(ExampleSuite.suite1)
object ExampleSuite {
val counter: AtomicInteger = new AtomicInteger(0)
trait ModuleBTest extends ModuleB {
override def moduleB: ModuleB.Service = new ModuleB.Service {
override def run(arg: Int): RIO[Clock, Unit] = ZIO.effectTotal(counter.incrementAndGet())
object ModuleATest extends ModuleALive with ModuleBTest
def verifyExpectedInvocationCount(expectedInvocationCount: Int): Task[Unit] = {
val actualInvocations = counter.get()
if (counter.get() == expectedInvocationCount)
throw new Exception(s"expected invocation count: $expectedInvocationCount but was $actualInvocations")
val suite1 = suite("a")(
testM("a should correctly schedule b") {
for {
_ <- ModuleATest.moduleA.schedule(42).fork
_ <- TestClock.adjust(12 hours)
_ <- verifyExpectedInvocationCount(1)
_ <- TestClock.adjust(12 hours)
_ <- verifyExpectedInvocationCount(2)
} yield assertCompletes
我使用计数器简化了测试,实际上我想使用 mockito 来验证调用计数以及正确的参数。但是,此测试不起作用。据我了解,这是因为 https://zio.dev/docs/howto/howto_test_effects#testing-clock.
现在,有一些示例说明如何使用 Promise 来解决这个问题。我尝试用这样的承诺替换计数器:
import java.util.concurrent.atomic.AtomicInteger
import zio.test.{DefaultRunnableSpec, assertCompletes, suite, testM}
import zio.{Promise, Task, UIO, ZIO}
object ExampleSpec extends DefaultRunnableSpec(ExampleSuite.suite1)
object ExampleSuite {
val counter: AtomicInteger = new AtomicInteger(0)
var promise: UIO[Promise[Unit, Int]] = Promise.make[Unit, Int]
trait ModuleBTest extends ModuleB {
override def moduleB: ModuleB.Service = new ModuleB.Service {
override def run(arg: Int) = promise.map(_.succeed(counter.incrementAndGet))
object ModuleATest extends ModuleALive with ModuleBTest
def verifyExpectedInvocationCount(expectedInvocationCount: Int, actualInvocations: Int): Task[Unit] = {
if (actualInvocations == expectedInvocationCount)
throw new Exception(s"expected invocation count: $expectedInvocationCount but was $actualInvocations")
val suite1 = suite("a")(
testM("a should correctly schedule b") {
for {
_ <- ModuleATest.moduleA.schedule(42).fork
p <- promise
actualInvocationCount <- p.await
_ <- verifyExpectedInvocationCount(expectedInvocationCount = 1, actualInvocationCount)
} yield assertCompletes
在你的例子中 promise
的类型是 UIO[Promise[Unit, Int]]
import zio.clock.Clock
import zio.duration._
import zio.test.environment.TestClock
import zio.test.{ assertCompletes, suite, testM, DefaultRunnableSpec }
import zio._
object ExampleSpec extends DefaultRunnableSpec {
trait ModuleA {
def moduleA: ModuleA.Service
object ModuleA {
trait Service {
def schedule(arg: Int): RIO[Clock, Unit]
trait ModuleALive extends ModuleA {
def moduleB: ModuleB.Service
override def moduleA: ModuleA.Service = new ModuleA.Service {
override def schedule(arg: Int): RIO[Clock, Unit] =
moduleB.run(arg).repeat(Schedule.spaced(1.day)).map(_ => ())
trait ModuleB {
def moduleB: ModuleB.Service
object ModuleB {
trait Service {
def run(arg: Int): RIO[Clock, Unit]
trait ModuleBTest extends ModuleB {
val counter: Ref[Int]
val invocations: Queue[Int]
override def moduleB: ModuleB.Service = new ModuleB.Service {
override def run(arg: Int): UIO[Unit] =
counter.updateAndGet(_ + 1).flatMap(invocations.offer).unit
object ModuleATest {
def apply(ref: Ref[Int], queue: Queue[Int]): ModuleALive with ModuleBTest =
new ModuleALive with ModuleBTest {
val counter = ref
val invocations = queue
def verifyExpectedInvocationCount(invocations: Queue[Int], expected: Int): Task[Unit] =
invocations.take.flatMap { actual =>
if (actual == expected)
ZIO.fail(new Exception(s"expected invocation count: $expected but was $actual"))
def spec = suite("a")(
testM("a should correctly schedule b") {
for {
counter <- Ref.make(0)
invocations <- Queue.unbounded[Int]
moduleATest = ModuleATest(counter, invocations)
_ <- moduleATest.moduleA.schedule(42).fork
_ <- TestClock.adjust(12.hours)
_ <- verifyExpectedInvocationCount(invocations, 1)
_ <- TestClock.adjust(12.hours)
_ <- verifyExpectedInvocationCount(invocations, 2)
} yield assertCompletes
由于我们要等待多个效果完成,所以我使用 Queue
- 您可以使用 ZIO 测试中的断言替换
- 此示例使用旧环境编码。有了层,组合这些服务并交换其中一个的测试实现会容易得多。
- 如果你想测试一个效果不会终止(例如,如果你没有等待足够的时间,另一个值永远不会放在队列中)你可以使用
我是 ZIO 和 ZIO 测试的新手,我想测试我在 ZIO v1.0.0RC17 下编写的调度服务:
import zio.{RIO, Schedule}
import zio.clock.Clock
import zio.duration._
trait ModuleA {
def moduleA: ModuleA.Service
object ModuleA {
trait Service {
def schedule(arg: Int): RIO[Clock, Unit]
trait ModuleALive extends ModuleA {
def moduleB: ModuleB.Service
override def moduleA: ModuleA.Service = new ModuleA.Service {
override def schedule(arg: Int): RIO[Clock, Unit] = {
moduleB.run(arg).repeat(Schedule.spaced(1 day)).map(_ => ())
trait ModuleB {
def moduleB: ModuleB.Service
object ModuleB {
trait Service {
def run(arg: Int): RIO[Clock, Unit]
ModuleA 的服务基本上应该 运行 每天一次 ModuleB 的服务方法,并将参数输入 ModuleA.Service.run。
import java.util.concurrent.atomic.AtomicInteger
import zio.clock.Clock
import zio.duration._
import zio.test.environment.TestClock
import zio.test.{DefaultRunnableSpec, assertCompletes, suite, testM}
import zio.{RIO, Task, ZIO}
object ExampleSpec extends DefaultRunnableSpec(ExampleSuite.suite1)
object ExampleSuite {
val counter: AtomicInteger = new AtomicInteger(0)
trait ModuleBTest extends ModuleB {
override def moduleB: ModuleB.Service = new ModuleB.Service {
override def run(arg: Int): RIO[Clock, Unit] = ZIO.effectTotal(counter.incrementAndGet())
object ModuleATest extends ModuleALive with ModuleBTest
def verifyExpectedInvocationCount(expectedInvocationCount: Int): Task[Unit] = {
val actualInvocations = counter.get()
if (counter.get() == expectedInvocationCount)
throw new Exception(s"expected invocation count: $expectedInvocationCount but was $actualInvocations")
val suite1 = suite("a")(
testM("a should correctly schedule b") {
for {
_ <- ModuleATest.moduleA.schedule(42).fork
_ <- TestClock.adjust(12 hours)
_ <- verifyExpectedInvocationCount(1)
_ <- TestClock.adjust(12 hours)
_ <- verifyExpectedInvocationCount(2)
} yield assertCompletes
我使用计数器简化了测试,实际上我想使用 mockito 来验证调用计数以及正确的参数。但是,此测试不起作用。据我了解,这是因为 https://zio.dev/docs/howto/howto_test_effects#testing-clock.
中描述的时序开销引入了竞争条件现在,有一些示例说明如何使用 Promise 来解决这个问题。我尝试用这样的承诺替换计数器:
import java.util.concurrent.atomic.AtomicInteger
import zio.test.{DefaultRunnableSpec, assertCompletes, suite, testM}
import zio.{Promise, Task, UIO, ZIO}
object ExampleSpec extends DefaultRunnableSpec(ExampleSuite.suite1)
object ExampleSuite {
val counter: AtomicInteger = new AtomicInteger(0)
var promise: UIO[Promise[Unit, Int]] = Promise.make[Unit, Int]
trait ModuleBTest extends ModuleB {
override def moduleB: ModuleB.Service = new ModuleB.Service {
override def run(arg: Int) = promise.map(_.succeed(counter.incrementAndGet))
object ModuleATest extends ModuleALive with ModuleBTest
def verifyExpectedInvocationCount(expectedInvocationCount: Int, actualInvocations: Int): Task[Unit] = {
if (actualInvocations == expectedInvocationCount)
throw new Exception(s"expected invocation count: $expectedInvocationCount but was $actualInvocations")
val suite1 = suite("a")(
testM("a should correctly schedule b") {
for {
_ <- ModuleATest.moduleA.schedule(42).fork
p <- promise
actualInvocationCount <- p.await
_ <- verifyExpectedInvocationCount(expectedInvocationCount = 1, actualInvocationCount)
} yield assertCompletes
在你的例子中 promise
的类型是 UIO[Promise[Unit, Int]]
import zio.clock.Clock
import zio.duration._
import zio.test.environment.TestClock
import zio.test.{ assertCompletes, suite, testM, DefaultRunnableSpec }
import zio._
object ExampleSpec extends DefaultRunnableSpec {
trait ModuleA {
def moduleA: ModuleA.Service
object ModuleA {
trait Service {
def schedule(arg: Int): RIO[Clock, Unit]
trait ModuleALive extends ModuleA {
def moduleB: ModuleB.Service
override def moduleA: ModuleA.Service = new ModuleA.Service {
override def schedule(arg: Int): RIO[Clock, Unit] =
moduleB.run(arg).repeat(Schedule.spaced(1.day)).map(_ => ())
trait ModuleB {
def moduleB: ModuleB.Service
object ModuleB {
trait Service {
def run(arg: Int): RIO[Clock, Unit]
trait ModuleBTest extends ModuleB {
val counter: Ref[Int]
val invocations: Queue[Int]
override def moduleB: ModuleB.Service = new ModuleB.Service {
override def run(arg: Int): UIO[Unit] =
counter.updateAndGet(_ + 1).flatMap(invocations.offer).unit
object ModuleATest {
def apply(ref: Ref[Int], queue: Queue[Int]): ModuleALive with ModuleBTest =
new ModuleALive with ModuleBTest {
val counter = ref
val invocations = queue
def verifyExpectedInvocationCount(invocations: Queue[Int], expected: Int): Task[Unit] =
invocations.take.flatMap { actual =>
if (actual == expected)
ZIO.fail(new Exception(s"expected invocation count: $expected but was $actual"))
def spec = suite("a")(
testM("a should correctly schedule b") {
for {
counter <- Ref.make(0)
invocations <- Queue.unbounded[Int]
moduleATest = ModuleATest(counter, invocations)
_ <- moduleATest.moduleA.schedule(42).fork
_ <- TestClock.adjust(12.hours)
_ <- verifyExpectedInvocationCount(invocations, 1)
_ <- TestClock.adjust(12.hours)
_ <- verifyExpectedInvocationCount(invocations, 2)
} yield assertCompletes
由于我们要等待多个效果完成,所以我使用 Queue
- 您可以使用 ZIO 测试中的断言替换
方法以获得更好的错误报告。 - 此示例使用旧环境编码。有了层,组合这些服务并交换其中一个的测试实现会容易得多。
- 如果你想测试一个效果不会终止(例如,如果你没有等待足够的时间,另一个值永远不会放在队列中)你可以使用