流会被终止吗?
Will the stream be terminated?
我有演员,如下所示:
如图所示,ActorStream
是 Actor
的子代。
问题是,当我终止Actor
时,ActorStream
是否也会终止?
方法如下,我如何在 Actor
中创建 ActorStream
:
def create(fsm: ActorRef[ServerHealth], cancel: Option[Cancellable]): Behavior[ServerHealthStreamer] =
Behaviors.setup { context =>
implicit val system = context.system
implicit val materializer = ActorMaterializer()
implicit val dispatcher = materializer.executionContext
val kafkaServer = system
.settings
.config
.getConfig("kafka")
.getString("servers")
val sink: Sink[ServerHealth, NotUsed] = ActorSink.actorRefWithAck[ServerHealth, ServerHealthStreamer, Ack](
ref = context.self,
onCompleteMessage = Complete,
onFailureMessage = Fail.apply,
messageAdapter = Message.apply,
onInitMessage = Init.apply,
ackMessage = Ack)
val cancel = Source.tick(1.seconds, 15.seconds, NotUsed)
.flatMapConcat(_ => Source.fromFuture(health(kafkaServer)))
.map {
case true =>
KafkaActive
case false =>
KafkaInactive
}
.to(sink)
.run()
Behaviors.receiveMessage {
case Init(ackTo) =>
ackTo ! Ack
Behaviors.same
case Message(ackTo, msg) =>
fsm ! msg
ackTo ! Ack
create(fsm, Some(cancel))
case Complete =>
Behaviors.same
case Fail(_) =>
fsm ! KafkaInactive
Behaviors.same
}
}
在您的情况下,actor 终止必须终止流,因为在幕后阶段 actor 观看通过了 actorRef 并且如果 Terminated 到达则完成阶段
我想你可以在这里找到更多信息
https://blog.colinbreck.com/integrating-akka-streams-and-akka-actors-part-ii/
An extremely important aspect to understand is that the materialized
stream is running as a set of actors on the threads of the execution
context on which they were allocated. In other words, the stream is
running independently from the actor that allocated it. This becomes
very important if the stream is long-running, or even infinite, and we
want the actor to manage the life-cycle of the stream, such that when
the actor stops, the stream is terminated. Expanding on the example
above, I will make the stream infinite and use a KillSwitch to manage
the life-cycle of the stream.
我有演员,如下所示:
如图所示,ActorStream
是 Actor
的子代。
问题是,当我终止Actor
时,ActorStream
是否也会终止?
方法如下,我如何在 Actor
中创建 ActorStream
:
def create(fsm: ActorRef[ServerHealth], cancel: Option[Cancellable]): Behavior[ServerHealthStreamer] =
Behaviors.setup { context =>
implicit val system = context.system
implicit val materializer = ActorMaterializer()
implicit val dispatcher = materializer.executionContext
val kafkaServer = system
.settings
.config
.getConfig("kafka")
.getString("servers")
val sink: Sink[ServerHealth, NotUsed] = ActorSink.actorRefWithAck[ServerHealth, ServerHealthStreamer, Ack](
ref = context.self,
onCompleteMessage = Complete,
onFailureMessage = Fail.apply,
messageAdapter = Message.apply,
onInitMessage = Init.apply,
ackMessage = Ack)
val cancel = Source.tick(1.seconds, 15.seconds, NotUsed)
.flatMapConcat(_ => Source.fromFuture(health(kafkaServer)))
.map {
case true =>
KafkaActive
case false =>
KafkaInactive
}
.to(sink)
.run()
Behaviors.receiveMessage {
case Init(ackTo) =>
ackTo ! Ack
Behaviors.same
case Message(ackTo, msg) =>
fsm ! msg
ackTo ! Ack
create(fsm, Some(cancel))
case Complete =>
Behaviors.same
case Fail(_) =>
fsm ! KafkaInactive
Behaviors.same
}
}
在您的情况下,actor 终止必须终止流,因为在幕后阶段 actor 观看通过了 actorRef 并且如果 Terminated 到达则完成阶段
我想你可以在这里找到更多信息 https://blog.colinbreck.com/integrating-akka-streams-and-akka-actors-part-ii/
An extremely important aspect to understand is that the materialized stream is running as a set of actors on the threads of the execution context on which they were allocated. In other words, the stream is running independently from the actor that allocated it. This becomes very important if the stream is long-running, or even infinite, and we want the actor to manage the life-cycle of the stream, such that when the actor stops, the stream is terminated. Expanding on the example above, I will make the stream infinite and use a KillSwitch to manage the life-cycle of the stream.