使用 Akka 流加载栏

Load bar with Akka streaming

Akka 流式传输是否可以有某种加载栏? 我正在寻找可以提供源的进步状态的东西。

Source via loadingBar(expectedElment) via someThingElse to Sink

其中 expectedElement 是达到“100%”时应通过的元素数。

如果您希望从流外监控进度,那么我建议使用 Agent。代理将存储 运行 计数:

import akka.agent.Agent
import akka.stream.scaladsl.Flow

def runningCountFlow[T](agent : Agent[Float]) = Flow[T].map { val =>
  agent send (_ + 1.0f)
  val
} 

然后您可以在流外监控代理:

import scala.concurrent.ExecutionContext.Implicits.global

val expectedCount = 42.toFloat
val countAgent = Agent(0.0f)

type DataType = ???

val stream = 
  Source via runningCountFlow[DataType](countAgent) via someThingElse runWith Sink

Thread.sleep(5000) //let the stream run for a while

val percentComplete = countAgent.get / expectedCount

println(s"stream is $percentComplete complete")