如何创建一个可以稍后通过方法调用接收元素的源?
How to create a Source that can receive elements later via a method call?
我想创建一个 Source
并稍后在其上推送元素,例如:
val src = ... // create the Source here
// and then, do something like this
pushElement(x1, src)
pushElement(x2, src)
推荐的方法是什么?
谢谢!
可以通过三种方式实现:
1. Post 使用 SourceQueue 实现
您可以使用 Source.queue
将流具体化为 SourceQueue
:
case class Weather(zipCode : String, temperature : Double, raining : Boolean)
val bufferSize = 100
//if the buffer fills up then this strategy drops the oldest elements
//upon the arrival of a new element.
val overflowStrategy = akka.stream.OverflowStrategy.dropHead
val queue = Source.queue(bufferSize, overflowStrategy)
.filter(!_.raining)
.to(Sink foreach println)
.run() // in order to "keep" the queue Materialized value instead of the Sink's
queue offer Weather("02139", 32.0, true)
2。 Post 与 Actor 的物化
有一个类似的问答 ,要点是您将流具体化为 ActorRef 并向该 ref 发送消息:
val ref = Source.actorRef[Weather](Int.MaxValue, fail)
.filter(!_.raining)
.to(Sink foreach println )
.run() // in order to "keep" the ref Materialized value instead of the Sink's
ref ! Weather("02139", 32.0, true)
3。使用 Actor
进行预实现
同样,您可以显式创建一个包含消息缓冲区的 Actor,使用该 Actor 创建一个 Source,然后按照答案 :
中的描述发送该 Actor 消息
object WeatherForwarder {
def props : Props = Props[WeatherForwarder]
}
//see provided link for example definition
class WeatherForwarder extends Actor {...}
val actorRef = actorSystem actorOf WeatherForwarder.props
//note the stream has not been instatiated yet
actorRef ! Weather("02139", 32.0, true)
//stream already has 1 Weather value to process which is sitting in the
//ActorRef's internal buffer
val stream = Source(ActorPublisher[Weather](actorRef)).runWith{...}
在尝试解决这个问题并寻找一个好的解决方案之后,我发现了这个解决方案,它干净、简单,并且在 post 物化之前都有效。
val (ref: ActorRef, publisher: Publisher[Int]) =
Source.actorRef[Int](bufferSize = 1000, OverflowStrategy.fail)
.toMat(Sink.asPublisher(true))(Keep.both).run()
ref ! 1 //before
val source = Source.fromPublisher(publisher)
ref ! 2 //before
Thread.sleep(1000)
ref ! 3 //before
source.runForeach(println)
ref ! 4 //after
Thread.sleep(1000)
ref ! 5 //after
输出:
1
2
3
4
5
因为 Akka 2.5 Source
有一个 preMaterialize
方法。
根据 documentation,这看起来像指定的方法来完成您的要求:
There are situations in which you require a Source
materialized value before the Source
gets hooked up to the rest of the graph. This is particularly useful in the case of “materialized value powered” Sources, like Source.queue
, Source.actorRef
or Source.maybe
.
下面是一个关于 SourceQueue
的例子。元素在具体化之前和之后以及从 Flow
:
中被推送到队列中
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, OverflowStrategy}
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
val sourceDecl = Source.queue[String](bufferSize = 2, OverflowStrategy.backpressure)
val (sourceMat, source) = sourceDecl.preMaterialize()
// Adding element before actual materialization
sourceMat.offer("pre materialization element")
val flow = Flow[String].map { e =>
if(!e.contains("new")) {
// Adding elements from within the flow
sourceMat.offer("new element generated inside the flow")
}
s"Processing $e"
}
// Actually materializing with `run`
source.via(flow).to(Sink.foreach(println)).run()
// Adding element after materialization
sourceMat.offer("post materialization element")
输出:
Processing pre materialization element
Processing post materialization element
Processing new element generated inside the flow
Processing new element generated inside the flow
我想创建一个 Source
并稍后在其上推送元素,例如:
val src = ... // create the Source here
// and then, do something like this
pushElement(x1, src)
pushElement(x2, src)
推荐的方法是什么?
谢谢!
可以通过三种方式实现:
1. Post 使用 SourceQueue 实现
您可以使用 Source.queue
将流具体化为 SourceQueue
:
case class Weather(zipCode : String, temperature : Double, raining : Boolean)
val bufferSize = 100
//if the buffer fills up then this strategy drops the oldest elements
//upon the arrival of a new element.
val overflowStrategy = akka.stream.OverflowStrategy.dropHead
val queue = Source.queue(bufferSize, overflowStrategy)
.filter(!_.raining)
.to(Sink foreach println)
.run() // in order to "keep" the queue Materialized value instead of the Sink's
queue offer Weather("02139", 32.0, true)
2。 Post 与 Actor 的物化
有一个类似的问答
val ref = Source.actorRef[Weather](Int.MaxValue, fail)
.filter(!_.raining)
.to(Sink foreach println )
.run() // in order to "keep" the ref Materialized value instead of the Sink's
ref ! Weather("02139", 32.0, true)
3。使用 Actor
进行预实现同样,您可以显式创建一个包含消息缓冲区的 Actor,使用该 Actor 创建一个 Source,然后按照答案
object WeatherForwarder {
def props : Props = Props[WeatherForwarder]
}
//see provided link for example definition
class WeatherForwarder extends Actor {...}
val actorRef = actorSystem actorOf WeatherForwarder.props
//note the stream has not been instatiated yet
actorRef ! Weather("02139", 32.0, true)
//stream already has 1 Weather value to process which is sitting in the
//ActorRef's internal buffer
val stream = Source(ActorPublisher[Weather](actorRef)).runWith{...}
在尝试解决这个问题并寻找一个好的解决方案之后,我发现了这个解决方案,它干净、简单,并且在 post 物化之前都有效。
val (ref: ActorRef, publisher: Publisher[Int]) =
Source.actorRef[Int](bufferSize = 1000, OverflowStrategy.fail)
.toMat(Sink.asPublisher(true))(Keep.both).run()
ref ! 1 //before
val source = Source.fromPublisher(publisher)
ref ! 2 //before
Thread.sleep(1000)
ref ! 3 //before
source.runForeach(println)
ref ! 4 //after
Thread.sleep(1000)
ref ! 5 //after
输出:
1
2
3
4
5
因为 Akka 2.5 Source
有一个 preMaterialize
方法。
根据 documentation,这看起来像指定的方法来完成您的要求:
There are situations in which you require a
Source
materialized value before theSource
gets hooked up to the rest of the graph. This is particularly useful in the case of “materialized value powered” Sources, likeSource.queue
,Source.actorRef
orSource.maybe
.
下面是一个关于 SourceQueue
的例子。元素在具体化之前和之后以及从 Flow
:
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, OverflowStrategy}
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
val sourceDecl = Source.queue[String](bufferSize = 2, OverflowStrategy.backpressure)
val (sourceMat, source) = sourceDecl.preMaterialize()
// Adding element before actual materialization
sourceMat.offer("pre materialization element")
val flow = Flow[String].map { e =>
if(!e.contains("new")) {
// Adding elements from within the flow
sourceMat.offer("new element generated inside the flow")
}
s"Processing $e"
}
// Actually materializing with `run`
source.via(flow).to(Sink.foreach(println)).run()
// Adding element after materialization
sourceMat.offer("post materialization element")
输出:
Processing pre materialization element
Processing post materialization element
Processing new element generated inside the flow
Processing new element generated inside the flow