为什么添加 Thread.sleep 会使我的 Akka TestKit 单元测试通过?

Why does adding Thread.sleep make my Akka TestKit unit tests pass?

Java 8 和 Akka (Java API) 2.12:2.5.16 这里。我收到以下消息:

public class SomeMessage {
    private int anotherNum;

    public SomeMessage(int anotherNum) {
        this.anotherNum = anotherNum;
    }

    public int getAnotherNum() {
        return anotherNum;
    }

    public void setAnotherNum(int anotherNum) {
        this.anotherNum = anotherNum;
    }
}

以及以下演员:

public class TestActor extends AbstractActor {
    private Integer number;

    public TestActor(Integer number) {
        this.number = number;
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
            .matchAny(message -> {
                if (message instanceof SomeMessage) {
                    SomeMessage someMessage = (SomeMessage) message;
                    System.out.println("someMessage contains = " + someMessage.getAnotherNum());
                    someMessage.setAnotherNum(number);
                }
            }).build();
    }
}

以及以下单元测试:

@RunWith(MockitoJUnitRunner.class)
public class TestActorTest {
    static ActorSystem actorSystem;

    @BeforeClass
    public static void setup() {
        actorSystem = ActorSystem.create();
    }

    @AfterClass
    public static void teardown() {
        TestKit.shutdownActorSystem(actorSystem, Duration.create("10 seconds"), true);
        actorSystem = null;
    }

    @Test
    public void should_alter_some_message() {
        // given
        ActorRef testActor = actorSystem.actorOf(Props.create(TestActor.class, 10), "test.actor");
        SomeMessage someMessage = new SomeMessage(5);

        // when
        testActor.tell(someMessage, ActorRef.noSender());

        // then
        assertEquals(10, someMessage.getAnotherNum());
    }
}

所以我要验证的是 TestActor 实际上收到了一个 SomeMessage 并且它改变了它的内部状态。

当我 运行 这个单元测试时,它失败了,就好像演员从未收到消息一样:

java.lang.AssertionError: 
Expected :10
Actual   :5
 <Click to see difference>

    at org.junit.Assert.fail(Assert.java:88)
    at org.junit.Assert.failNotEquals(Assert.java:834)
    at org.junit.Assert.assertEquals(Assert.java:645)
  <rest of trace omitted for brevity>

[INFO] [01/30/2019 12:50:26.780] [default-akka.actor.default-dispatcher-2] [akka://default/user/test.actor] Message [myapp.actors.core.SomeMessage] without sender to Actor[akka://default/user/test.actor#2008219661] was not delivered. [1] dead letters encountered. If this is not an expected behavior, then [Actor[akka://default/user/test.actor#2008219661]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

但是当我修改测试方法并在其中引入一个Thread.sleep(5000)(在tell(...)之后)时,它以优异的成绩通过了:

@Test
public void should_alter_some_message() throws InterruptedException {
    // given
    ActorRef testActor = actorSystem.actorOf(Props.create(TestActor.class, null, 10), "test.actor");
    SomeMessage someMessage = new SomeMessage(5);

    // when
    testActor.tell(someMessage, ActorRef.noSender());

    Thread.sleep(5000);

    // then
    assertEquals(10, someMessage.getAnotherNum());
}

这是怎么回事?! 显然我不希望我的 actor 测试乱七八糟 sleeps,所以我在这里做错了什么以及修复方法是什么?提前致谢!

我认为你没有让演员完成他的工作。也许 AkkaActor 启动了他自己的线程?我认为 Actor 确实实现了 Runnable,但我并不是真正的 Akka 专家。 --> edit Actor 是一个界面,庆幸我说我不是专家..

我的猜测是,通过休眠您的主线程,您可以给 Actor 的 "thread" 时间来完成他的方法。

我知道这可能没有帮助,但太长了无法发表评论。 : (

@Asier Aranbarri 说得对,您不让演员完成工作。

Actor 具有异步性质,虽然它们没有实现 Runnable,但它们与用于发送消息的线程分开执行。

您向演员发送消息,然后立即断言消息已更改。由于 actor 在异步上下文中运行,即不同的线程,它仍然没有处理传入的消息。因此,放置 Threed.sleep 允许 actor 处理消息,并且只有在此之后断言才完成。

我可能会建议对您的初始设计进行一些更改,以便与 akka 自然结合得很好。

首先,akka不建议使用具有可变性的消息。它们必须是不可变的。在你的情况下,这被方法 SomeMessage#setAnotherNum 打破了。删除它:

public class SomeMessage {
    private int anotherNum;

    public SomeMessage(int anotherNum) {
        this.anotherNum = anotherNum;
    }

    public int getAnotherNum() {
        return anotherNum;
    }
}

在此之后创建 SomeMessage 的新实例而不是更改 TestActor 中的传入消息并将其发送回 context.sender()。就像这里定义的那样

static public class TestActor extends AbstractActor {
    private Integer number;

    public TestActor(Integer number) {
        this.number = number;
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .matchAny(message -> {
                    if (message instanceof SomeMessage) {
                        SomeMessage someMessage = (SomeMessage) message;
                        System.out.println("someMessage contains = " + someMessage.getAnotherNum());
                        context().sender().tell(new SomeMessage(number + someMessage.getAnotherNum()), context().self());
                    }
                }).build();
    }
}

现在,不再更改消息的内部状态,而是创建一条具有新状态的新消息,并将稍后的消息返回给 sender()。这是akka的恰当用法。

这允许测试使用 TestProbe 并重新定义如下

@Test
public void should_alter_some_message() {
    // given
    ActorRef testActor = actorSystem.actorOf(Props.create(TestActor.class,10));
    TestJavaActor.SomeMessage someMessage = new SomeMessage(5);
    TestProbe testProbe = TestProbe.apply(actorSystem);

    // when
    testActor.tell(someMessage, testProbe.ref());

    // then
    testProbe.expectMsg(new SomeMessage(15));
}

TestProbe 模拟发件人并捕获来自 TestActor 的所有传入 message/replies。请注意,使用 expectMsg(new SomeMessage(15)) 而不是断言。它有一个内部阻塞机制,在断言完成之前等待消息被接收。这就是 testing actors example.

中发生的事情

要使 expectMsg 正确断言,您必须重写 class SomeMessage

中的 equals 方法

编辑:

Why does Akka frown upon changing SomeMessage's internal state?

akka 的强大功能之一是它不需要同步或 wait/notify 来控制对共享数据的访问。但这只能通过消息不变性来实现。想象一下,您发送了一条可变消息,您在 actor 处理它的确切时间对其进行了更改。这可能会导致竞争条件。阅读 this 了解更多详情。

And (2) does the same apply to changing Actors' internal state? Is it "OK" for ActorRefs to have properties that can be changed, or does the community frown upon that as well (and if so, why!)?

不,这不适用于此处。如果任何状态被封装在一个 actor 中并且只有它可以改变它,那么具有可变性是完全没问题的。