像 Stream,Source from function?
Akka Stream, Source from function?
我想要一个 Source 以给定的时间间隔评估函数并发出其输出。作为解决方法,我可以使用 Source.queue
+ offer
来完成,但还没有找到更简洁的方法。理想情况下我会有类似
的东西
def myFunction() = .... // function with side-effects
Source.tick(1.second, 1.second, myFunction) // myFunction is evaluated at every tick
有什么想法吗?
可能最干净的方法是使用 map
Source.tick(1.second, 1.second, NotUsed).map(_ ⇒ myFunction())
我想,throttle
就是您所需要的。 Source
应用于 iterable 的完全可运行示例,它使用 next()
:
中的函数
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.ThrottleMode.Shaping
import akka.stream.scaladsl.Source
import scala.concurrent.duration._
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
var i = 0
def myFunction(): Int = {
i = i + 1
i
}
import scala.collection.immutable.Iterable
val x: Iterable[Int] = new Iterable[Int] {
override def iterator: Iterator[Int] =
new Iterator[Int] {
override def hasNext: Boolean = true
override def next(): Int = myFunction()
}
}
Source(x).throttle(1, 1.second, 1, Shaping).runForeach(println)
throttle
参数:节流源,每 1 秒 1 个元素,最大突发 = 1,在发出消息之前暂停以达到节流率(即 Shaping
的目的)。
我想要一个 Source 以给定的时间间隔评估函数并发出其输出。作为解决方法,我可以使用 Source.queue
+ offer
来完成,但还没有找到更简洁的方法。理想情况下我会有类似
def myFunction() = .... // function with side-effects
Source.tick(1.second, 1.second, myFunction) // myFunction is evaluated at every tick
有什么想法吗?
可能最干净的方法是使用 map
Source.tick(1.second, 1.second, NotUsed).map(_ ⇒ myFunction())
我想,throttle
就是您所需要的。 Source
应用于 iterable 的完全可运行示例,它使用 next()
:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.ThrottleMode.Shaping
import akka.stream.scaladsl.Source
import scala.concurrent.duration._
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
var i = 0
def myFunction(): Int = {
i = i + 1
i
}
import scala.collection.immutable.Iterable
val x: Iterable[Int] = new Iterable[Int] {
override def iterator: Iterator[Int] =
new Iterator[Int] {
override def hasNext: Boolean = true
override def next(): Int = myFunction()
}
}
Source(x).throttle(1, 1.second, 1, Shaping).runForeach(println)
throttle
参数:节流源,每 1 秒 1 个元素,最大突发 = 1,在发出消息之前暂停以达到节流率(即 Shaping
的目的)。