如何动态添加元素到Source?
How to add elements to Source dynamically?
我有生成未绑定源并使用它的示例代码:
主要对象{
def main(args : Array[String]): Unit = {
implicit val system = ActorSystem("Sys")
import system.dispatcher
implicit val materializer = ActorFlowMaterializer()
val source: Source[String] = Source(() => {
Iterator.continually({ "message:" + ThreadLocalRandom.current().nextInt(10000)})
})
source.runForeach((item:String) => { println(item) })
.onComplete{ _ => system.shutdown() }
}
}
我想创建 class 实现:
trait MySources {
def addToSource(item: String)
def getSource() : Source[String]
}
而且我需要在多线程中使用它,例如:
class MyThread(mySources: MySources) extends Thread {
override def run(): Unit = {
for(i <- 1 to 1000000) { // here will be infinite loop
mySources.addToSource(i.toString)
}
}
}
以及预期的完整代码:
object Main {
def main(args : Array[String]): Unit = {
implicit val system = ActorSystem("Sys")
import system.dispatcher
implicit val materializer = ActorFlowMaterializer()
val sources = new MySourcesImplementation()
for(i <- 1 to 100) {
(new MyThread(sources)).start()
}
val source = sources.getSource()
source.runForeach((item:String) => { println(item) })
.onComplete{ _ => system.shutdown() }
}
}
如何实现MySources
?
获得非有限来源的一种方法是使用一种特殊类型的演员作为来源,一种混合了 ActorPublisher
特征的演员。如果您创建其中一种参与者,然后调用 ActorPublisher.apply
,您最终会得到一个 Reactive Streams Publisher
实例,您可以使用 apply
来自Source
从中生成 Source
。之后,您只需要确保您的 ActorPublisher
class 正确处理用于向下游发送元素的 Reactive Streams 协议,您就可以开始了。一个非常简单的例子如下:
import akka.actor._
import akka.stream.actor._
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl._
object DynamicSourceExample extends App{
implicit val system = ActorSystem("test")
implicit val materializer = ActorFlowMaterializer()
val actorRef = system.actorOf(Props[ActorBasedSource])
val pub = ActorPublisher[Int](actorRef)
Source(pub).
map(_ * 2).
runWith(Sink.foreach(println))
for(i <- 1 until 20){
actorRef ! i.toString
Thread.sleep(1000)
}
}
class ActorBasedSource extends Actor with ActorPublisher[Int]{
import ActorPublisherMessage._
var items:List[Int] = List.empty
def receive = {
case s:String =>
if (totalDemand == 0)
items = items :+ s.toInt
else
onNext(s.toInt)
case Request(demand) =>
if (demand > items.size){
items foreach (onNext)
items = List.empty
}
else{
val (send, keep) = items.splitAt(demand.toInt)
items = keep
send foreach (onNext)
}
case other =>
println(s"got other $other")
}
}
在 Akka Streams 2 中,您可以使用 sourceQueue:
正如我在 , the SourceQueue
is the way to go, and since Akka 2.5 there is a handy method preMaterialize
中提到的那样,无需先创建复合源。
我在.
中举了一个例子
我有生成未绑定源并使用它的示例代码:
主要对象{
def main(args : Array[String]): Unit = {
implicit val system = ActorSystem("Sys")
import system.dispatcher
implicit val materializer = ActorFlowMaterializer()
val source: Source[String] = Source(() => {
Iterator.continually({ "message:" + ThreadLocalRandom.current().nextInt(10000)})
})
source.runForeach((item:String) => { println(item) })
.onComplete{ _ => system.shutdown() }
}
}
我想创建 class 实现:
trait MySources {
def addToSource(item: String)
def getSource() : Source[String]
}
而且我需要在多线程中使用它,例如:
class MyThread(mySources: MySources) extends Thread {
override def run(): Unit = {
for(i <- 1 to 1000000) { // here will be infinite loop
mySources.addToSource(i.toString)
}
}
}
以及预期的完整代码:
object Main {
def main(args : Array[String]): Unit = {
implicit val system = ActorSystem("Sys")
import system.dispatcher
implicit val materializer = ActorFlowMaterializer()
val sources = new MySourcesImplementation()
for(i <- 1 to 100) {
(new MyThread(sources)).start()
}
val source = sources.getSource()
source.runForeach((item:String) => { println(item) })
.onComplete{ _ => system.shutdown() }
}
}
如何实现MySources
?
获得非有限来源的一种方法是使用一种特殊类型的演员作为来源,一种混合了 ActorPublisher
特征的演员。如果您创建其中一种参与者,然后调用 ActorPublisher.apply
,您最终会得到一个 Reactive Streams Publisher
实例,您可以使用 apply
来自Source
从中生成 Source
。之后,您只需要确保您的 ActorPublisher
class 正确处理用于向下游发送元素的 Reactive Streams 协议,您就可以开始了。一个非常简单的例子如下:
import akka.actor._
import akka.stream.actor._
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl._
object DynamicSourceExample extends App{
implicit val system = ActorSystem("test")
implicit val materializer = ActorFlowMaterializer()
val actorRef = system.actorOf(Props[ActorBasedSource])
val pub = ActorPublisher[Int](actorRef)
Source(pub).
map(_ * 2).
runWith(Sink.foreach(println))
for(i <- 1 until 20){
actorRef ! i.toString
Thread.sleep(1000)
}
}
class ActorBasedSource extends Actor with ActorPublisher[Int]{
import ActorPublisherMessage._
var items:List[Int] = List.empty
def receive = {
case s:String =>
if (totalDemand == 0)
items = items :+ s.toInt
else
onNext(s.toInt)
case Request(demand) =>
if (demand > items.size){
items foreach (onNext)
items = List.empty
}
else{
val (send, keep) = items.splitAt(demand.toInt)
items = keep
send foreach (onNext)
}
case other =>
println(s"got other $other")
}
}
在 Akka Streams 2 中,您可以使用 sourceQueue:
正如我在 SourceQueue
is the way to go, and since Akka 2.5 there is a handy method preMaterialize
中提到的那样,无需先创建复合源。
我在