像 Stream,Source from function?

Akka Stream, Source from function?

我想要一个 Source 以给定的时间间隔评估函数并发出其输出。作为解决方法,我可以使用 Source.queue + offer 来完成,但还没有找到更简洁的方法。理想情况下我会有类似

的东西
def myFunction() = ....                     // function with side-effects 
Source.tick(1.second, 1.second, myFunction) // myFunction is evaluated at every tick

有什么想法吗?

可能最干净的方法是使用 map

Source.tick(1.second, 1.second, NotUsed).map(_ ⇒ myFunction())

我想,throttle 就是您所需要的。 Source 应用于 iterable 的完全可运行示例,它使用 next():

中的函数
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.ThrottleMode.Shaping
import akka.stream.scaladsl.Source

import scala.concurrent.duration._

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
var i = 0

def myFunction(): Int = {
  i = i + 1
  i
}

import scala.collection.immutable.Iterable

val x: Iterable[Int] = new Iterable[Int] {
  override def iterator: Iterator[Int] =
    new Iterator[Int] {
      override def hasNext: Boolean = true

      override def next(): Int = myFunction()
    }
}
Source(x).throttle(1, 1.second, 1, Shaping).runForeach(println)

throttle 参数:节流源,每 1 秒 1 个元素,最大突发 = 1,在发出消息之前暂停以达到节流率(即 Shaping 的目的)。