访问由 Source.actorRef 创建的 akka 流源的底层 ActorRef

Accessing the underlying ActorRef of an akka stream Source created by Source.actorRef

我正在尝试使用 Source.actorRef method to create an akka.stream.scaladsl.Source 对象。某种形式

import akka.stream.OverflowStrategy.fail
import akka.stream.scaladsl.Source

case class Weather(zip : String, temp : Double, raining : Boolean)

val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)

val sunnySource = weatherSource.filter(!_.raining)
...

我的问题是:如何将数据发送到基于 ActorRef 的源对象

我假设向源发送消息是某种形式

//does not compile
weatherSource ! Weather("90210", 72.0, false)
weatherSource ! Weather("02139", 32.0, true)

但是 weatherSource 没有 ! 运算符或 tell 方法。

documentation 并没有详细描述如何使用 Source.actorRef,它只是说你可以...

提前感谢您的审核和回复。

您需要 Flow:

  import akka.stream.OverflowStrategy.fail
  import akka.stream.scaladsl.Source
  import akka.stream.scaladsl.{Sink, Flow}

  case class Weather(zip : String, temp : Double, raining : Boolean)

  val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)

  val sunnySource = weatherSource.filter(!_.raining)

  val ref = Flow[Weather]
    .to(Sink.ignore)
    .runWith(sunnySource)

  ref ! Weather("02139", 32.0, true)

记住这一切都是实验性的,可能会改变!

正如@Noah 指出的 akka-streams 的实验性质,他的回答可能不适用于 1.0 版本。我必须遵循 this example:

给出的示例
implicit val materializer = ActorMaterializer()
val (actorRef: ActorRef, publisher: Publisher[TweetInfo]) = Source.actorRef[TweetInfo](1000, OverflowStrategy.fail).toMat(Sink.publisher)(Keep.both).run()
actorRef ! TweetInfo(...)
val source: Source[TweetInfo, Unit] = Source[TweetInfo](publisher)

与所有 'materialized values' 一样,ActorRef 的实例只有在整个流具体化后才能访问,换句话说,当 RunnableGraph 运行.

// RunnableGraph[ActorRef] means that you get ActorRef when you run the graph
val rg1: RunnableGraph[ActorRef] = sunnySource.to(Sink.foreach(println))

// You get ActorRef instance as a materialized value
val actorRef1: ActorRef = rg1.run()

// Or even more correct way: to materialize both ActorRef and future to completion 
// of the stream, so that we know when we are done:

// RunnableGraph[(ActorRef, Future[Done])] means that you get tuple
// (ActorRef, Future[Done]) when you run the graph
val rg2: RunnableGraph[(ActorRef, Future[Done])] =
  sunnySource.toMat(Sink.foreach(println))(Keep.both)

// You get both ActorRef and Future[Done] instances as materialized values
val (actorRef2, future) = rg2.run()

actorRef2 ! Weather("90210", 72.0, false)
actorRef2 ! Weather("02139", 32.0, true)
actorRef2 ! akka.actor.Status.Success("Done!") // Complete the stream
future onComplete { /* ... */ }