Akka Streams:通过 alsoTo 将元素复制到另一个流
Akka Streams: Copy element to another stream via alsoTo
我想使用 alsoTo
将元素从一个 Source
复制到另一个 Source
,但它没有像我预期的那样工作。从 Java InputStream
创建 Akka Source
并进行一些转换并使用 alsoTo
创建 s1
副本的代码示例:
import java.io.ByteArrayInputStream
import java.nio.charset.StandardCharsets
import akka.actor.ActorSystem
import akka.stream.IOResult
import akka.stream.scaladsl.{Sink, Source, StreamConverters}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
object Main {
def main(args: Array[String]): Unit = {
implicit val system: ActorSystem = ActorSystem("AkkaStreams_alsoTo")
implicit val executionContext: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global
val byteStream = new ByteArrayInputStream("a,b,c\nd,e,f\ng,h,j".getBytes(StandardCharsets.UTF_8))
try {
val s1: Source[List[List[String]], Future[IOResult]] = StreamConverters.fromInputStream(() => byteStream)
.map { bs =>
val rows = bs.utf8String.split("\n").toList
val valuesPerRow = rows.map(row => row.split(",").toList)
valuesPerRow
}
// A copy of s1?
val s2: Source[List[List[String]], Future[IOResult]] = s1.alsoTo(Sink.collection)
println("s1.runForeach: ")
Await.result(s1.runForeach(println), 20.seconds)
println("s2.runForeach: ")
Await.result(s2.runForeach(println), 20.seconds)
println("Done")
system.terminate()
}
finally {
byteStream.close()
}
}
}
它产生以下输出:
s1.runForeach:
List(List(a, b, c), List(d, e, f), List(g, h, j))
s2.runForeach:
Done
如您所见,s2.runForeach
不打印任何元素。这种行为的原因是什么——是因为读取 Java InputStream
时的副作用吗?
我正在使用 Akka Streams v2.6.8。
I want to use alsoTo
to copy the elements from one Source
to another, but it does not work as I'm expecting.
alsoTo
不会将元素从 Source
复制到另一个 Source
;它有效地从 Source
/Flow
复制元素并将它们发送到另一个 Sink
(alsoTo
方法的参数)。因此你的期望是不正确的。
As you can see, s2.runForeach
doesn't print any elements. What is the reason of such behavior--is it because of a side effect when it reads the Java InputStream
?
因为 byteStream
是一个 val
,s1.runForeach(println)
和 s2.runForeach(println)
都“共享”这个实例,即使它们是 两个不同的 Akka Stream 蓝图。因此,当 s1.runForeach(println)
被调用时, byteStream
被消耗,并且当 s2.runForeach(println)
之后被执行时, InputStream
中没有任何内容可供 s2.runForeach(println)
打印。
将byteStream
改为def
,打印如下:
s1.runForeach:
List(List(a, b, c), List(d, e, f), List(g, h, j))
s2.runForeach:
List(List(a, b, c), List(d, e, f), List(g, h, j))
Done
这解释了为什么 s2.runForeach(println)
在这种特殊情况下不打印任何内容,但它并没有真正显示 alsoTo
的实际效果。您的设置存在缺陷,因为 s2.runForeach(println)
仅打印 Source
中的元素并忽略 alsoTo(Sink.collection)
.
的物化值
查看 alsoTo
行为的一种简单方法如下:
val byteStream = new ByteArrayInputStream("a,b,c\nd,e,f\ng,h,j".getBytes(StandardCharsets.UTF_8))
val s1: Source[List[List[String]], Future[IOResult]] =
StreamConverters.fromInputStream(() => byteStream)
.map { bs =>
val rows = bs.utf8String.split("\n").toList
val valuesPerRow = rows.map(row => row.split(",").toList)
valuesPerRow
}
val stream = s1.alsoTo(Sink.foreach(println)).runWith(Sink.foreach(println))
// ^ same thing as .runForeach(println)
Await.ready(stream, 5.seconds)
println("Done")
system.terminate()
运行 以上打印如下:
List(List(a, b, c), List(d, e, f), List(g, h, j))
List(List(a, b, c), List(d, e, f), List(g, h, j))
Done
oneSource
中的元素被发送到bothSink
s.
如果你想使用Sink.collection
...
val (result1, result2) =
s1.alsoToMat(Sink.collection)(Keep.right).toMat(Sink.collection)(Keep.both).run()
// ^ note the use of alsoToMat in order to retain the materialized value
val res1 = Await.result(result1, 5.seconds)
val res2 = Await.result(result2, 5.seconds)
println(s"res1: $res1")
println(s"res2: $res2")
println("Done")
system.terminate()
...打印...
res1: List(List(List(a, b, c), List(d, e, f), List(g, h, j)))
res2: List(List(List(a, b, c), List(d, e, f), List(g, h, j)))
Done
同样,一个 Source
中的元素被发送到两个 Sink
中。
我想使用 alsoTo
将元素从一个 Source
复制到另一个 Source
,但它没有像我预期的那样工作。从 Java InputStream
创建 Akka Source
并进行一些转换并使用 alsoTo
创建 s1
副本的代码示例:
import java.io.ByteArrayInputStream
import java.nio.charset.StandardCharsets
import akka.actor.ActorSystem
import akka.stream.IOResult
import akka.stream.scaladsl.{Sink, Source, StreamConverters}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
object Main {
def main(args: Array[String]): Unit = {
implicit val system: ActorSystem = ActorSystem("AkkaStreams_alsoTo")
implicit val executionContext: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global
val byteStream = new ByteArrayInputStream("a,b,c\nd,e,f\ng,h,j".getBytes(StandardCharsets.UTF_8))
try {
val s1: Source[List[List[String]], Future[IOResult]] = StreamConverters.fromInputStream(() => byteStream)
.map { bs =>
val rows = bs.utf8String.split("\n").toList
val valuesPerRow = rows.map(row => row.split(",").toList)
valuesPerRow
}
// A copy of s1?
val s2: Source[List[List[String]], Future[IOResult]] = s1.alsoTo(Sink.collection)
println("s1.runForeach: ")
Await.result(s1.runForeach(println), 20.seconds)
println("s2.runForeach: ")
Await.result(s2.runForeach(println), 20.seconds)
println("Done")
system.terminate()
}
finally {
byteStream.close()
}
}
}
它产生以下输出:
s1.runForeach:
List(List(a, b, c), List(d, e, f), List(g, h, j))
s2.runForeach:
Done
如您所见,s2.runForeach
不打印任何元素。这种行为的原因是什么——是因为读取 Java InputStream
时的副作用吗?
我正在使用 Akka Streams v2.6.8。
I want to use
alsoTo
to copy the elements from oneSource
to another, but it does not work as I'm expecting.
alsoTo
不会将元素从 Source
复制到另一个 Source
;它有效地从 Source
/Flow
复制元素并将它们发送到另一个 Sink
(alsoTo
方法的参数)。因此你的期望是不正确的。
As you can see,
s2.runForeach
doesn't print any elements. What is the reason of such behavior--is it because of a side effect when it reads the JavaInputStream
?
因为 byteStream
是一个 val
,s1.runForeach(println)
和 s2.runForeach(println)
都“共享”这个实例,即使它们是 两个不同的 Akka Stream 蓝图。因此,当 s1.runForeach(println)
被调用时, byteStream
被消耗,并且当 s2.runForeach(println)
之后被执行时, InputStream
中没有任何内容可供 s2.runForeach(println)
打印。
将byteStream
改为def
,打印如下:
s1.runForeach:
List(List(a, b, c), List(d, e, f), List(g, h, j))
s2.runForeach:
List(List(a, b, c), List(d, e, f), List(g, h, j))
Done
这解释了为什么 s2.runForeach(println)
在这种特殊情况下不打印任何内容,但它并没有真正显示 alsoTo
的实际效果。您的设置存在缺陷,因为 s2.runForeach(println)
仅打印 Source
中的元素并忽略 alsoTo(Sink.collection)
.
查看 alsoTo
行为的一种简单方法如下:
val byteStream = new ByteArrayInputStream("a,b,c\nd,e,f\ng,h,j".getBytes(StandardCharsets.UTF_8))
val s1: Source[List[List[String]], Future[IOResult]] =
StreamConverters.fromInputStream(() => byteStream)
.map { bs =>
val rows = bs.utf8String.split("\n").toList
val valuesPerRow = rows.map(row => row.split(",").toList)
valuesPerRow
}
val stream = s1.alsoTo(Sink.foreach(println)).runWith(Sink.foreach(println))
// ^ same thing as .runForeach(println)
Await.ready(stream, 5.seconds)
println("Done")
system.terminate()
运行 以上打印如下:
List(List(a, b, c), List(d, e, f), List(g, h, j))
List(List(a, b, c), List(d, e, f), List(g, h, j))
Done
oneSource
中的元素被发送到bothSink
s.
如果你想使用Sink.collection
...
val (result1, result2) =
s1.alsoToMat(Sink.collection)(Keep.right).toMat(Sink.collection)(Keep.both).run()
// ^ note the use of alsoToMat in order to retain the materialized value
val res1 = Await.result(result1, 5.seconds)
val res2 = Await.result(result2, 5.seconds)
println(s"res1: $res1")
println(s"res2: $res2")
println("Done")
system.terminate()
...打印...
res1: List(List(List(a, b, c), List(d, e, f), List(g, h, j)))
res2: List(List(List(a, b, c), List(d, e, f), List(g, h, j)))
Done
同样,一个 Source
中的元素被发送到两个 Sink
中。