为什么立即停止流以及如何防止它
Why is the stream stopped immediately and how to prevent it
在一个简单的测试中,我希望流程生成并打印一秒钟的数字。我想测试处理背压的流操作,需要一个不支持背压的 Source
。
... with FreeSpec ... {
implicit val system = ActorSystem(this.getClass.getSimpleName)
private val matSettings: ActorMaterializerSettings =
ActorMaterializerSettings(system).withDebugLogging(true).withFuzzing(true)
implicit val mat = ActorMaterializer(matSettings.withInputBuffer(1, 1))
"must print numbers for a second" in {
val source: Source[Double, ActorRef] =
Source.actorRef(100, OverflowStrategy.fail).map(_ => Random.nextDouble())
val sink: Sink[Double, Future[Done]] = Sink.foreach(println)
val actorRef: ActorRef = Flow[Double].to(sink).runWith(source)
system.scheduler.schedule(0.micro, 1.milli, actorRef, "tick")(system.dispatcher)
Thread.sleep(1000)
println("done")
}
但是,actor 似乎在流具体化后立即停止,一条消息都没有传递,只发送了两条。我在哪里误解了这里发生的事情以及如何获得预期的结果?
日志:
08:24:07.077 DEBUG a.a.LocalActorRefProvider$SystemGuardian - now supervising Actor[akka://BuffersAndTicksSpec/system/UnhandledMessageForwarder#1836419858]
08:24:07.078 DEBUG akka.event.EventStream - subscribing Actor[akka://BuffersAndTicksSpec/system/UnhandledMessageForwarder#1836419858] to channel class akka.actor.UnhandledMessage
08:24:07.079 DEBUG akka.event.EventStream - Default Loggers started
08:24:07.079 DEBUG a.e.LoggingBus$$anonfun$startDefaultLoggers$$anon - started (akka.event.LoggingBus$$anonfun$startDefaultLoggers$$anon@26bf15e8)
08:24:07.079 DEBUG akka.event.EventStream - unsubscribing StandardOutLogger from all channels
08:24:07.080 DEBUG a.a.LocalActorRefProvider$SystemGuardian - now supervising Actor[akka://BuffersAndTicksSpec/system/deadLetterListener#-876496689]
08:24:07.080 DEBUG akka.event.EventStream - subscribing Actor[akka://BuffersAndTicksSpec/system/deadLetterListener#-876496689] to channel class akka.actor.DeadLetter
08:24:07.080 DEBUG akka.event.DeadLetterListener - started (akka.event.DeadLetterListener@691f7305)
08:24:07.081 DEBUG a.a.LocalActorRefProvider$SystemGuardian - now supervising Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434]
08:24:07.081 DEBUG akka.event.EventStreamUnsubscriber - registering unsubscriber with akka.event.EventStream@a27e5b9
08:24:07.081 DEBUG akka.event.EventStream - initialized unsubscriber to: Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434], registering 3 initial subscribers with it
08:24:07.082 DEBUG akka.event.EventStreamUnsubscriber - started (akka.event.EventStreamUnsubscriber@75843fb6)
08:24:07.082 DEBUG akka.event.EventStreamUnsubscriber - watching Actor[akka://BuffersAndTicksSpec/system/log1-Slf4jLogger#28712451] in order to unsubscribe from EventStream when it terminates
08:24:07.082 DEBUG akka.event.EventStreamUnsubscriber - watching Actor[akka://BuffersAndTicksSpec/system/UnhandledMessageForwarder#1836419858] in order to unsubscribe from EventStream when it terminates
08:24:07.082 DEBUG akka.event.slf4j.Slf4jLogger - now watched by Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434]
08:24:07.083 DEBUG a.e.LoggingBus$$anonfun$startDefaultLoggers$$anon - now watched by Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434]
08:24:07.083 DEBUG akka.event.EventStreamUnsubscriber - watching Actor[akka://BuffersAndTicksSpec/system/deadLetterListener#-876496689] in order to unsubscribe from EventStream when it terminates
08:24:07.083 DEBUG akka.event.DeadLetterListener - now watched by Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434]
08:24:07.164 DEBUG a.a.LocalActorRefProvider$Guardian - now supervising Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0#-544371192]
08:24:07.164 DEBUG akka.stream.impl.StreamSupervisor - started (akka.stream.impl.StreamSupervisor@6fd27b4e)
08:24:07.173 WARN a.stream.impl.ActorMaterializerImpl - Fuzzing mode is enabled on this system. If you see this warning on your production system then set akka.stream.materializer.debug.fuzzing-mode to off.
08:24:07.270 DEBUG akka.stream.impl.StreamSupervisor - now supervising Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0/flow-0-0-unknown-operation#478879902]
08:24:07.276 DEBUG a.s.i.fusing.ActorGraphInterpreter - started (akka.stream.impl.fusing.ActorGraphInterpreter@42111ea7)
08:24:07.276 DEBUG akka.stream.impl.StreamSupervisor - now supervising Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0/flow-0-1-actorRefSource#1199332529]
08:24:07.283 DEBUG akka.stream.impl.ActorRefSourceActor - started (akka.stream.impl.ActorRefSourceActor@6dbd251e)
08:24:07.289 INFO akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://BuffersAndTicksSpec/deadLetters] to Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0/flow-0-1-actorRefSource#1199332529] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
08:24:07.290 DEBUG akka.stream.impl.ActorRefSourceActor - stopped
08:24:07.291 DEBUG a.s.i.fusing.ActorGraphInterpreter - stopped
08:24:07.297 INFO akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://BuffersAndTicksSpec/deadLetters] to Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0/flow-0-1-actorRefSource#1199332529] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
done
这里的问题是你的 Source.actorRef
类型。即使 actor 可以接收 Any
类型的消息,当您将其包装在 Source
中时,您需要给它一个类型(以实现 Akka Streams 强类型)。
示例:
val source: Source[Int, ActorRef] = Source.actorRef[Int](100, OverflowStrategy.fail)
幕后发生的事情是您的 Source
将尝试将每条传入消息投射到 Int
。
在您的情况下,Source.actorRef
未明确键入,因此 Nothing
由编译器推断。 (这被您连接 map
阶段的事实掩盖了,在该阶段一切都变成了 Double
)。您收到的所有 "tick" 消息都被转换为 Nothing
,导致 ClassCastException
.
解决方案是输入您的 Source.actorRef
阶段
val source: Source[Double, ActorRef] =
Source.actorRef[String](100, OverflowStrategy.fail).map(_ => Random.nextDouble())
在一个简单的测试中,我希望流程生成并打印一秒钟的数字。我想测试处理背压的流操作,需要一个不支持背压的 Source
。
... with FreeSpec ... {
implicit val system = ActorSystem(this.getClass.getSimpleName)
private val matSettings: ActorMaterializerSettings =
ActorMaterializerSettings(system).withDebugLogging(true).withFuzzing(true)
implicit val mat = ActorMaterializer(matSettings.withInputBuffer(1, 1))
"must print numbers for a second" in {
val source: Source[Double, ActorRef] =
Source.actorRef(100, OverflowStrategy.fail).map(_ => Random.nextDouble())
val sink: Sink[Double, Future[Done]] = Sink.foreach(println)
val actorRef: ActorRef = Flow[Double].to(sink).runWith(source)
system.scheduler.schedule(0.micro, 1.milli, actorRef, "tick")(system.dispatcher)
Thread.sleep(1000)
println("done")
}
但是,actor 似乎在流具体化后立即停止,一条消息都没有传递,只发送了两条。我在哪里误解了这里发生的事情以及如何获得预期的结果? 日志:
08:24:07.077 DEBUG a.a.LocalActorRefProvider$SystemGuardian - now supervising Actor[akka://BuffersAndTicksSpec/system/UnhandledMessageForwarder#1836419858]
08:24:07.078 DEBUG akka.event.EventStream - subscribing Actor[akka://BuffersAndTicksSpec/system/UnhandledMessageForwarder#1836419858] to channel class akka.actor.UnhandledMessage
08:24:07.079 DEBUG akka.event.EventStream - Default Loggers started
08:24:07.079 DEBUG a.e.LoggingBus$$anonfun$startDefaultLoggers$$anon - started (akka.event.LoggingBus$$anonfun$startDefaultLoggers$$anon@26bf15e8)
08:24:07.079 DEBUG akka.event.EventStream - unsubscribing StandardOutLogger from all channels
08:24:07.080 DEBUG a.a.LocalActorRefProvider$SystemGuardian - now supervising Actor[akka://BuffersAndTicksSpec/system/deadLetterListener#-876496689]
08:24:07.080 DEBUG akka.event.EventStream - subscribing Actor[akka://BuffersAndTicksSpec/system/deadLetterListener#-876496689] to channel class akka.actor.DeadLetter
08:24:07.080 DEBUG akka.event.DeadLetterListener - started (akka.event.DeadLetterListener@691f7305)
08:24:07.081 DEBUG a.a.LocalActorRefProvider$SystemGuardian - now supervising Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434]
08:24:07.081 DEBUG akka.event.EventStreamUnsubscriber - registering unsubscriber with akka.event.EventStream@a27e5b9
08:24:07.081 DEBUG akka.event.EventStream - initialized unsubscriber to: Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434], registering 3 initial subscribers with it
08:24:07.082 DEBUG akka.event.EventStreamUnsubscriber - started (akka.event.EventStreamUnsubscriber@75843fb6)
08:24:07.082 DEBUG akka.event.EventStreamUnsubscriber - watching Actor[akka://BuffersAndTicksSpec/system/log1-Slf4jLogger#28712451] in order to unsubscribe from EventStream when it terminates
08:24:07.082 DEBUG akka.event.EventStreamUnsubscriber - watching Actor[akka://BuffersAndTicksSpec/system/UnhandledMessageForwarder#1836419858] in order to unsubscribe from EventStream when it terminates
08:24:07.082 DEBUG akka.event.slf4j.Slf4jLogger - now watched by Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434]
08:24:07.083 DEBUG a.e.LoggingBus$$anonfun$startDefaultLoggers$$anon - now watched by Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434]
08:24:07.083 DEBUG akka.event.EventStreamUnsubscriber - watching Actor[akka://BuffersAndTicksSpec/system/deadLetterListener#-876496689] in order to unsubscribe from EventStream when it terminates
08:24:07.083 DEBUG akka.event.DeadLetterListener - now watched by Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434]
08:24:07.164 DEBUG a.a.LocalActorRefProvider$Guardian - now supervising Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0#-544371192]
08:24:07.164 DEBUG akka.stream.impl.StreamSupervisor - started (akka.stream.impl.StreamSupervisor@6fd27b4e)
08:24:07.173 WARN a.stream.impl.ActorMaterializerImpl - Fuzzing mode is enabled on this system. If you see this warning on your production system then set akka.stream.materializer.debug.fuzzing-mode to off.
08:24:07.270 DEBUG akka.stream.impl.StreamSupervisor - now supervising Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0/flow-0-0-unknown-operation#478879902]
08:24:07.276 DEBUG a.s.i.fusing.ActorGraphInterpreter - started (akka.stream.impl.fusing.ActorGraphInterpreter@42111ea7)
08:24:07.276 DEBUG akka.stream.impl.StreamSupervisor - now supervising Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0/flow-0-1-actorRefSource#1199332529]
08:24:07.283 DEBUG akka.stream.impl.ActorRefSourceActor - started (akka.stream.impl.ActorRefSourceActor@6dbd251e)
08:24:07.289 INFO akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://BuffersAndTicksSpec/deadLetters] to Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0/flow-0-1-actorRefSource#1199332529] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
08:24:07.290 DEBUG akka.stream.impl.ActorRefSourceActor - stopped
08:24:07.291 DEBUG a.s.i.fusing.ActorGraphInterpreter - stopped
08:24:07.297 INFO akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://BuffersAndTicksSpec/deadLetters] to Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0/flow-0-1-actorRefSource#1199332529] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
done
这里的问题是你的 Source.actorRef
类型。即使 actor 可以接收 Any
类型的消息,当您将其包装在 Source
中时,您需要给它一个类型(以实现 Akka Streams 强类型)。
示例:
val source: Source[Int, ActorRef] = Source.actorRef[Int](100, OverflowStrategy.fail)
幕后发生的事情是您的 Source
将尝试将每条传入消息投射到 Int
。
在您的情况下,Source.actorRef
未明确键入,因此 Nothing
由编译器推断。 (这被您连接 map
阶段的事实掩盖了,在该阶段一切都变成了 Double
)。您收到的所有 "tick" 消息都被转换为 Nothing
,导致 ClassCastException
.
解决方案是输入您的 Source.actorRef
阶段
val source: Source[Double, ActorRef] =
Source.actorRef[String](100, OverflowStrategy.fail).map(_ => Random.nextDouble())