将元素放入流中并 return 一个对象

Put elements in stream and return an object

在 akka 中,我想将元素放在流中,return 一个对象。我知道这些元素可能是 运行 图表的来源。但是如何将元素和 return 对象放在 运行 时间?


import akka.actor.ActorSystem
import akka.stream.QueueOfferResult.{Dropped, Enqueued, Failure, QueueClosed}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl.{Keep, Sink, Source}

import scala.Array.range
import scala.util.Success

object StreamElement {

  implicit val system = ActorSystem("StreamElement")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher

  def main(args: Array[String]): Unit = {
    val (queue, value) = Source
    .queue[Int](10, OverflowStrategy.backpressure)
    .map(x => {
      x * x
    })
    .toMat(Sink.asPublisher(false))(Keep.both)
    .run()

    range(0, 10)
      .map(x => {
        queue.offer(x).onComplete {
          case Success(Enqueued) => {
          }

          case Success(Dropped) => {}
          case _ => {
             println("others")
          }
        }
      })
    }
}

如何获取值 returned?

实际上,您想要 return 每个元素的 int 值。 因此您可以创建流,然后每次都连接到源和接收器。


package tech.parasol.scala.akka

import akka.actor.ActorSystem
import akka.stream.QueueOfferResult.{Dropped, Enqueued, Failure, QueueClosed}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}

import scala.Array.range
import scala.util.Success

object StreamElement {

  implicit val system = ActorSystem("StreamElement")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher

  val flow = Flow[Int]
    .buffer(16, OverflowStrategy.backpressure)
    .map(x => x * x)

  def main(args: Array[String]): Unit = {
    range(0, 10)
      .map(x => {
        Source.single(x).via(flow).runWith(Sink.head)
      }.map( v => println("v ===> " + v)
      ))
  }

}


我不清楚为什么在您的示例代码中没有将 Scala 集合作为源提供给 Stream。鉴于您已经组成了一个 Stream,其中包含要在源队列和发布者接收器中捕获的物化值,您可以使用 Source.fromPublisher 创建一个订阅者源来收集想要的值,如下所示:

import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream._

implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()  // Not needed for Akka 2.6+

val (queue, pub) = Source
  .queue[Int](10, OverflowStrategy.backpressure)
  .map(x =>  x * x)
  .toMat(Sink.asPublisher(false))(Keep.both)
  .run()

val fromQueue = Source(0 until 10).runForeach(queue.offer(_))

val source = Source.fromPublisher(pub)

source.runForeach(x => print(x + " "))
// Output:
// 0 1 4 9 16 25 36 49 64 81