Akka Streams:通过 alsoTo 将元素复制到另一个流

Akka Streams: Copy element to another stream via alsoTo

我想使用 alsoTo 将元素从一个 Source 复制到另一个 Source,但它没有像我预期的那样工作。从 Java InputStream 创建 Akka Source 并进行一些转换并使用 alsoTo 创建 s1 副本的代码示例:

import java.io.ByteArrayInputStream
import java.nio.charset.StandardCharsets

import akka.actor.ActorSystem
import akka.stream.IOResult
import akka.stream.scaladsl.{Sink, Source, StreamConverters}

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}

object Main {
  def main(args: Array[String]): Unit = {
    implicit val system: ActorSystem = ActorSystem("AkkaStreams_alsoTo")
    implicit val executionContext: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global

    val byteStream = new ByteArrayInputStream("a,b,c\nd,e,f\ng,h,j".getBytes(StandardCharsets.UTF_8))
    try {
      val s1: Source[List[List[String]], Future[IOResult]] = StreamConverters.fromInputStream(() => byteStream)
        .map { bs =>
          val rows = bs.utf8String.split("\n").toList
          val valuesPerRow = rows.map(row => row.split(",").toList)
          valuesPerRow
        }
      // A copy of s1?
      val s2: Source[List[List[String]], Future[IOResult]] = s1.alsoTo(Sink.collection)
      println("s1.runForeach: ")
      Await.result(s1.runForeach(println), 20.seconds)

      println("s2.runForeach: ")
      Await.result(s2.runForeach(println), 20.seconds)

      println("Done")
      system.terminate()
    }
    finally {
      byteStream.close()
    }
  }
}

它产生以下输出:

s1.runForeach: 
List(List(a, b, c), List(d, e, f), List(g, h, j))
s2.runForeach: 
Done

如您所见,s2.runForeach 不打印任何元素。这种行为的原因是什么——是因为读取 Java InputStream 时的副作用吗?

我正在使用 Akka Streams v2.6.8。

I want to use alsoTo to copy the elements from one Source to another, but it does not work as I'm expecting.

alsoTo 不会将元素从 Source 复制到另一个 Source;它有效地从 Source/Flow 复制元素并将它们发送到另一个 SinkalsoTo 方法的参数)。因此你的期望是不正确的。

As you can see, s2.runForeach doesn't print any elements. What is the reason of such behavior--is it because of a side effect when it reads the Java InputStream?

因为 byteStream 是一个 vals1.runForeach(println)s2.runForeach(println) 都“共享”这个实例,即使它们是 两个不同的 Akka Stream 蓝图。因此,当 s1.runForeach(println) 被调用时, byteStream 被消耗,并且当 s2.runForeach(println) 之后被执行时, InputStream 中没有任何内容可供 s2.runForeach(println) 打印。

byteStream改为def,打印如下:

s1.runForeach: 
List(List(a, b, c), List(d, e, f), List(g, h, j))
s2.runForeach: 
List(List(a, b, c), List(d, e, f), List(g, h, j))
Done

这解释了为什么 s2.runForeach(println) 在这种特殊情况下不打印任何内容,但它并没有真正显示 alsoTo 的实际效果。您的设置存在缺陷,因为 s2.runForeach(println) 仅打印 Source 中的元素并忽略 alsoTo(Sink.collection).

的物化值

查看 alsoTo 行为的一种简单方法如下:

val byteStream = new ByteArrayInputStream("a,b,c\nd,e,f\ng,h,j".getBytes(StandardCharsets.UTF_8))

val s1: Source[List[List[String]], Future[IOResult]] =
  StreamConverters.fromInputStream(() => byteStream)
    .map { bs =>
      val rows = bs.utf8String.split("\n").toList
      val valuesPerRow = rows.map(row => row.split(",").toList)
      valuesPerRow
    }

val stream = s1.alsoTo(Sink.foreach(println)).runWith(Sink.foreach(println))
                                          // ^ same thing as .runForeach(println)
Await.ready(stream, 5.seconds)
println("Done")
system.terminate()

运行 以上打印如下:

List(List(a, b, c), List(d, e, f), List(g, h, j))
List(List(a, b, c), List(d, e, f), List(g, h, j))
Done

oneSource中的元素被发送到bothSinks.

如果你想使用Sink.collection...

val (result1, result2) =
  s1.alsoToMat(Sink.collection)(Keep.right).toMat(Sink.collection)(Keep.both).run()
 // ^ note the use of alsoToMat in order to retain the materialized value

val res1 = Await.result(result1, 5.seconds)
val res2 = Await.result(result2, 5.seconds)
println(s"res1: $res1")
println(s"res2: $res2")
println("Done")
system.terminate()

...打印...

res1: List(List(List(a, b, c), List(d, e, f), List(g, h, j)))
res2: List(List(List(a, b, c), List(d, e, f), List(g, h, j)))
Done

同样,一个 Source 中的元素被发送到两个 Sink 中。