Akka官方教程:Parent actor在child被PoisonPill停止时没有收到终止消息
Akka official tutorial: Parent actor doesn't receive termination message when child is stopped with PoisonPill
我正在阅读 official Akka tutorial。我重复了教程中的操作,但得到了另一个结果。
我的源代码:
设备组:
package com.lightbend.akka.sample.iot;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class DeviceGroup extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
final String groupId;
final Map<String, ActorRef> deviceIdToActor = new HashMap<>();
final Map<ActorRef, String> actorToDeviceId = new HashMap<>();
public DeviceGroup(String groupId) {
this.groupId = groupId;
}
public static Props props(String groupId) {
return Props.create(DeviceGroup.class, groupId);
}
@Override
public void preStart() {
log.info("DeviceGroup {} started", groupId);
}
@Override
public void postStop() {
log.info("DeviceGroup {} stopped", groupId);
}
private void onTrackDevice(Device.RequestTrackDevice trackMsg) {
if (this.groupId.equals(trackMsg.groupId)) {
ActorRef deviceActor = deviceIdToActor.get(trackMsg.deviceId);
if (deviceActor != null) {
deviceActor.forward(trackMsg, getContext());
} else {
log.info("Creating device actor for {}", trackMsg.deviceId);
deviceActor = getContext().actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
deviceIdToActor.put(trackMsg.deviceId, deviceActor);
actorToDeviceId.put(deviceActor, trackMsg.deviceId);
deviceActor.forward(trackMsg, getContext());
}
} else {
log.warning(
"Ignoring TrackDevice request for {}. This actor is responsible for {}.",
groupId, this.groupId
);
}
}
private void onTerminated(Terminated t) {
ActorRef deviceActor = t.getActor();
String deviceId = actorToDeviceId.get(deviceActor);
log.info("Device actor for {} has been terminated", deviceId);
actorToDeviceId.remove(deviceActor);
deviceIdToActor.remove(deviceId);
}
private void onDeviceList(RequestDeviceList r) {
getSender().tell(new ReplyDeviceList(r.requestId, deviceIdToActor.keySet()), getSelf());
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Device.RequestTrackDevice.class, this::onTrackDevice)
.match(Terminated.class, this::onTerminated)
.match(RequestDeviceList.class, this::onDeviceList)
.build();
}
public static final class RequestDeviceList {
final long requestId;
public RequestDeviceList(long requestId) {
this.requestId = requestId;
}
}
public static final class ReplyDeviceList {
final long requestId;
final Set<String> ids;
public ReplyDeviceList(long requestId, Set<String> ids) {
this.requestId = requestId;
this.ids = ids;
}
}
}
设备:
package com.lightbend.akka.sample.iot;
import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import java.util.Optional;
public class Device extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
final String groupId;
final String deviceId;
public Device(String groupId, String deviceId) {
this.groupId = groupId;
this.deviceId = deviceId;
}
public static Props props(String groupId, String deviceId) {
return Props.create(Device.class, groupId, deviceId);
}
Optional<Double> lastTemperatureReading = Optional.empty();
@Override
public void preStart() {
log.info("Device actor {}-{} started", groupId, deviceId);
}
@Override
public void postStop() {
log.info("Device actor {}-{} stopped", groupId, deviceId);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(RequestTrackDevice.class, r -> {
if (this.groupId.equals(r.groupId) && this.deviceId.equals(r.deviceId)) {
getSender().tell(new DeviceRegistered(), getSelf());
} else {
log.warning(
"Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.",
r.groupId, r.deviceId, this.groupId, this.deviceId
);
}
})
.match(ReadTemperature.class, r -> {
getSender().tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf());
})
.match(RecordTemperature.class, r -> {
log.info("Recorded temperature reading {} with {}", r.value, r.requestId);
lastTemperatureReading = Optional.of(r.value);
getSender().tell(new TemperatureRecorded(r.requestId), getSelf());
})
.build();
}
//temperature request
public static final class ReadTemperature {
long requestId;
public ReadTemperature(long requestId) {
this.requestId = requestId;
}
}
//temperature response
public static final class RespondTemperature {
long requestId;
Optional<Double> value;
public RespondTemperature(long requestId, Optional<Double> value) {
this.requestId = requestId;
this.value = value;
}
}
//==============
public static final class RecordTemperature {
final long requestId;
final double value;
public RecordTemperature(long requestId, double value) {
this.requestId = requestId;
this.value = value;
}
}
public static final class TemperatureRecorded {
final long requestId;
public TemperatureRecorded(long requestId) {
this.requestId = requestId;
}
}
//===================
public static final class RequestTrackDevice {
public final String groupId;
public final String deviceId;
public RequestTrackDevice(String groupId, String deviceId) {
this.groupId = groupId;
this.deviceId = deviceId;
}
}
public static final class DeviceRegistered {
}
}
我尝试运行以下测试:
@Test
public void testListActiveDevicesAfterOneShutsDown() {
TestKit probe = new TestKit(system);
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
groupActor.tell(new Device.RequestTrackDevice("group", "device1"), probe.getRef());
probe.expectMsgClass(Device.DeviceRegistered.class);
ActorRef toShutDown = probe.getLastSender();
groupActor.tell(new Device.RequestTrackDevice("group", "device2"), probe.getRef());
probe.expectMsgClass(Device.DeviceRegistered.class);
groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef());
DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
assertEquals(0L, reply.requestId);
assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids);
probe.watch(toShutDown);
toShutDown.tell(PoisonPill.getInstance(), ActorRef.noSender());
probe.expectTerminated(toShutDown);
// using awaitAssert to retry because it might take longer for the groupActor
// to see the Terminated, that order is undefined
probe.awaitAssert(Duration.fromNanos(10_000_000_000L),() -> {
groupActor.tell(new DeviceGroup.RequestDeviceList(1L), probe.getRef());
DeviceGroup.ReplyDeviceList r =
probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
assertEquals(1L, r.requestId);
assertEquals(Stream.of("device2").collect(Collectors.toSet()), r.ids);
return null;
});
}
我在日志中看到以下内容:
[INFO] [04/09/2018 17:56:31.976] [default-akka.actor.default-dispatcher-2] [akka://default/user/$a] DeviceGroup group started
[INFO] [04/09/2018 17:56:31.977] [default-akka.actor.default-dispatcher-2] [akka://default/user/$a] Creating device actor for device1
[INFO] [04/09/2018 17:56:31.979] [default-akka.actor.default-dispatcher-3] [akka://default/user/$a/device-device1] Device actor group-device1 started
[INFO] [04/09/2018 17:56:31.983] [default-akka.actor.default-dispatcher-3] [akka://default/user/$a] Creating device actor for device2
[INFO] [04/09/2018 17:56:31.983] [default-akka.actor.default-dispatcher-2] [akka://default/user/$a/device-device2] Device actor group-device2 started
[INFO] [04/09/2018 17:56:31.992] [default-akka.actor.default-dispatcher-2] [akka://default/user/$a/device-device1] Device actor group-device1 stopped
java.lang.AssertionError:
Expected :[device2]
Actual :[device1, device2]
我调试了一下,发现com.lightbend.akka.sample.iot.DeviceGroup#onTerminated
方法没有被调用
该代码有什么问题?
你不看演员,所以当他死的时候你不会收到通知。
链接的教程包含以下代码行,您没有包含在上面的代码中,这很可能是您的 onTerminated
未被调用的原因:
getContext().watch(deviceActor);
我正在阅读 official Akka tutorial。我重复了教程中的操作,但得到了另一个结果。
我的源代码:
设备组:
package com.lightbend.akka.sample.iot;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class DeviceGroup extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
final String groupId;
final Map<String, ActorRef> deviceIdToActor = new HashMap<>();
final Map<ActorRef, String> actorToDeviceId = new HashMap<>();
public DeviceGroup(String groupId) {
this.groupId = groupId;
}
public static Props props(String groupId) {
return Props.create(DeviceGroup.class, groupId);
}
@Override
public void preStart() {
log.info("DeviceGroup {} started", groupId);
}
@Override
public void postStop() {
log.info("DeviceGroup {} stopped", groupId);
}
private void onTrackDevice(Device.RequestTrackDevice trackMsg) {
if (this.groupId.equals(trackMsg.groupId)) {
ActorRef deviceActor = deviceIdToActor.get(trackMsg.deviceId);
if (deviceActor != null) {
deviceActor.forward(trackMsg, getContext());
} else {
log.info("Creating device actor for {}", trackMsg.deviceId);
deviceActor = getContext().actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
deviceIdToActor.put(trackMsg.deviceId, deviceActor);
actorToDeviceId.put(deviceActor, trackMsg.deviceId);
deviceActor.forward(trackMsg, getContext());
}
} else {
log.warning(
"Ignoring TrackDevice request for {}. This actor is responsible for {}.",
groupId, this.groupId
);
}
}
private void onTerminated(Terminated t) {
ActorRef deviceActor = t.getActor();
String deviceId = actorToDeviceId.get(deviceActor);
log.info("Device actor for {} has been terminated", deviceId);
actorToDeviceId.remove(deviceActor);
deviceIdToActor.remove(deviceId);
}
private void onDeviceList(RequestDeviceList r) {
getSender().tell(new ReplyDeviceList(r.requestId, deviceIdToActor.keySet()), getSelf());
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Device.RequestTrackDevice.class, this::onTrackDevice)
.match(Terminated.class, this::onTerminated)
.match(RequestDeviceList.class, this::onDeviceList)
.build();
}
public static final class RequestDeviceList {
final long requestId;
public RequestDeviceList(long requestId) {
this.requestId = requestId;
}
}
public static final class ReplyDeviceList {
final long requestId;
final Set<String> ids;
public ReplyDeviceList(long requestId, Set<String> ids) {
this.requestId = requestId;
this.ids = ids;
}
}
}
设备:
package com.lightbend.akka.sample.iot;
import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import java.util.Optional;
public class Device extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
final String groupId;
final String deviceId;
public Device(String groupId, String deviceId) {
this.groupId = groupId;
this.deviceId = deviceId;
}
public static Props props(String groupId, String deviceId) {
return Props.create(Device.class, groupId, deviceId);
}
Optional<Double> lastTemperatureReading = Optional.empty();
@Override
public void preStart() {
log.info("Device actor {}-{} started", groupId, deviceId);
}
@Override
public void postStop() {
log.info("Device actor {}-{} stopped", groupId, deviceId);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(RequestTrackDevice.class, r -> {
if (this.groupId.equals(r.groupId) && this.deviceId.equals(r.deviceId)) {
getSender().tell(new DeviceRegistered(), getSelf());
} else {
log.warning(
"Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.",
r.groupId, r.deviceId, this.groupId, this.deviceId
);
}
})
.match(ReadTemperature.class, r -> {
getSender().tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf());
})
.match(RecordTemperature.class, r -> {
log.info("Recorded temperature reading {} with {}", r.value, r.requestId);
lastTemperatureReading = Optional.of(r.value);
getSender().tell(new TemperatureRecorded(r.requestId), getSelf());
})
.build();
}
//temperature request
public static final class ReadTemperature {
long requestId;
public ReadTemperature(long requestId) {
this.requestId = requestId;
}
}
//temperature response
public static final class RespondTemperature {
long requestId;
Optional<Double> value;
public RespondTemperature(long requestId, Optional<Double> value) {
this.requestId = requestId;
this.value = value;
}
}
//==============
public static final class RecordTemperature {
final long requestId;
final double value;
public RecordTemperature(long requestId, double value) {
this.requestId = requestId;
this.value = value;
}
}
public static final class TemperatureRecorded {
final long requestId;
public TemperatureRecorded(long requestId) {
this.requestId = requestId;
}
}
//===================
public static final class RequestTrackDevice {
public final String groupId;
public final String deviceId;
public RequestTrackDevice(String groupId, String deviceId) {
this.groupId = groupId;
this.deviceId = deviceId;
}
}
public static final class DeviceRegistered {
}
}
我尝试运行以下测试:
@Test
public void testListActiveDevicesAfterOneShutsDown() {
TestKit probe = new TestKit(system);
ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));
groupActor.tell(new Device.RequestTrackDevice("group", "device1"), probe.getRef());
probe.expectMsgClass(Device.DeviceRegistered.class);
ActorRef toShutDown = probe.getLastSender();
groupActor.tell(new Device.RequestTrackDevice("group", "device2"), probe.getRef());
probe.expectMsgClass(Device.DeviceRegistered.class);
groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef());
DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
assertEquals(0L, reply.requestId);
assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids);
probe.watch(toShutDown);
toShutDown.tell(PoisonPill.getInstance(), ActorRef.noSender());
probe.expectTerminated(toShutDown);
// using awaitAssert to retry because it might take longer for the groupActor
// to see the Terminated, that order is undefined
probe.awaitAssert(Duration.fromNanos(10_000_000_000L),() -> {
groupActor.tell(new DeviceGroup.RequestDeviceList(1L), probe.getRef());
DeviceGroup.ReplyDeviceList r =
probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
assertEquals(1L, r.requestId);
assertEquals(Stream.of("device2").collect(Collectors.toSet()), r.ids);
return null;
});
}
我在日志中看到以下内容:
[INFO] [04/09/2018 17:56:31.976] [default-akka.actor.default-dispatcher-2] [akka://default/user/$a] DeviceGroup group started
[INFO] [04/09/2018 17:56:31.977] [default-akka.actor.default-dispatcher-2] [akka://default/user/$a] Creating device actor for device1
[INFO] [04/09/2018 17:56:31.979] [default-akka.actor.default-dispatcher-3] [akka://default/user/$a/device-device1] Device actor group-device1 started
[INFO] [04/09/2018 17:56:31.983] [default-akka.actor.default-dispatcher-3] [akka://default/user/$a] Creating device actor for device2
[INFO] [04/09/2018 17:56:31.983] [default-akka.actor.default-dispatcher-2] [akka://default/user/$a/device-device2] Device actor group-device2 started
[INFO] [04/09/2018 17:56:31.992] [default-akka.actor.default-dispatcher-2] [akka://default/user/$a/device-device1] Device actor group-device1 stopped
java.lang.AssertionError:
Expected :[device2]
Actual :[device1, device2]
我调试了一下,发现com.lightbend.akka.sample.iot.DeviceGroup#onTerminated
方法没有被调用
该代码有什么问题?
你不看演员,所以当他死的时候你不会收到通知。
链接的教程包含以下代码行,您没有包含在上面的代码中,这很可能是您的 onTerminated
未被调用的原因:
getContext().watch(deviceActor);