如何创建一个可以稍后通过方法调用接收元素的源?

How to create a Source that can receive elements later via a method call?

我想创建一个 Source 并稍后在其上推送元素,例如:

val src = ... // create the Source here
// and then, do something like this
pushElement(x1, src)
pushElement(x2, src)

推荐的方法是什么?

谢谢!

可以通过三种方式实现:

1. Post 使用 SourceQueue 实现

您可以使用 Source.queue 将流具体化为 SourceQueue:

case class Weather(zipCode : String, temperature : Double, raining : Boolean)

val bufferSize = 100

//if the buffer fills up then this strategy drops the oldest elements
//upon the arrival of a new element.
val overflowStrategy = akka.stream.OverflowStrategy.dropHead

val queue = Source.queue(bufferSize, overflowStrategy)
                  .filter(!_.raining)
                  .to(Sink foreach println)
                  .run() // in order to "keep" the queue Materialized value instead of the Sink's

queue offer Weather("02139", 32.0, true)

2。 Post 与 Actor 的物化

有一个类似的问答 ,要点是您将流具体化为 ActorRef 并向该 ref 发送消息:

val ref = Source.actorRef[Weather](Int.MaxValue, fail)
                .filter(!_.raining)
                .to(Sink foreach println )
                .run() // in order to "keep" the ref Materialized value instead of the Sink's

ref ! Weather("02139", 32.0, true)

3。使用 Actor

进行预实现

同样,您可以显式创建一个包含消息缓冲区的 Actor,使用该 Actor 创建一个 Source,然后按照答案 :

中的描述发送该 Actor 消息
object WeatherForwarder {
  def props : Props = Props[WeatherForwarder]
}

//see provided link for example definition
class WeatherForwarder extends Actor {...}

val actorRef = actorSystem actorOf WeatherForwarder.props 

//note the stream has not been instatiated yet
actorRef ! Weather("02139", 32.0, true) 

//stream already has 1 Weather value to process which is sitting in the 
//ActorRef's internal buffer
val stream = Source(ActorPublisher[Weather](actorRef)).runWith{...}

在尝试解决这个问题并寻找一个好的解决方案之后,我发现了这个解决方案,它干净、简单,并且在 post 物化之前都有效。

  val (ref: ActorRef, publisher: Publisher[Int]) =
    Source.actorRef[Int](bufferSize = 1000, OverflowStrategy.fail)
      .toMat(Sink.asPublisher(true))(Keep.both).run()

  ref ! 1 //before

  val source = Source.fromPublisher(publisher)

  ref ! 2 //before
  Thread.sleep(1000)
  ref ! 3 //before

  source.runForeach(println)

  ref ! 4 //after
  Thread.sleep(1000)
  ref ! 5 //after

输出:

1
2
3
4
5

因为 Akka 2.5 Source 有一个 preMaterialize 方法。

根据 documentation,这看起来像指定的方法来完成您的要求:

There are situations in which you require a Source materialized value before the Source gets hooked up to the rest of the graph. This is particularly useful in the case of “materialized value powered” Sources, like Source.queue, Source.actorRef or Source.maybe.

下面是一个关于 SourceQueue 的例子。元素在具体化之前和之后以及从 Flow:

中被推送到队列中
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, OverflowStrategy}

implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()


val sourceDecl = Source.queue[String](bufferSize = 2, OverflowStrategy.backpressure)
val (sourceMat, source) = sourceDecl.preMaterialize()

// Adding element before actual materialization
sourceMat.offer("pre materialization element")

val flow = Flow[String].map { e =>
  if(!e.contains("new")) {
    // Adding elements from within the flow
    sourceMat.offer("new element generated inside the flow")
  }
  s"Processing $e"
}

// Actually materializing with `run`
source.via(flow).to(Sink.foreach(println)).run()

// Adding element after materialization
sourceMat.offer("post materialization element")

输出:

Processing pre materialization element
Processing post materialization element
Processing new element generated inside the flow
Processing new element generated inside the flow