将 Source[ByteString, NotUsed] 值打印到控制台

Print Source[ByteString, NotUsed] values to console

如何在控制台中打印源的值。

val someSource = Source.single(ByteString("SomeValue"))

我想从此来源打印字符串“SomeValue”。我试过了:

someSource.to(Sink.foreach(println)) //This one prints RunnableGraph object

someSource.map(each => {
    val pqr = each.decodeString(ByteString.UTF_8)
    print(pqr)
}) // THis one prints res3: soneSource.Repr[Unit]  = Source(SourceShape(Map.out(169373838)))

如何打印最初用于创建单个对象源的原始字符串。

从问题中的内容来看,我认为您可能正在使用 Scala 控制台或 Scala 工作表。

在 Scala 控制台或工作集中,它打印当前语句中创建的事物的表示。例如,

scala> val i = 5
val i: Int = 5

scala> val s = "ssfdf"
val s: String = ssfdf

但是,当你在这里使用 println 之类的东西时会发生什么,

scala> val u = println("dfsd")
dfsd
val u: Unit = ()

它还执行 println,然后打印出由该 println 创建的值 u 实际上是一个 Unit.

这可能就是您的困惑所在,因为您在 Sink.foreach 中的 println 在这种情况下不起作用。

这是因为这种情况更像是下面的情况,您实际上是在定义一个函数。

scala> val f1 = (s: String) => println(s)
val f1: String => Unit = $Lambda62/0x0000000800689840@1796b2d4

您在这里没有使用 println,您只是在定义一个函数(String => UnitFunction1[String, Unit] 的一个实例),它将使用 println.

因此,控制台仅打印出此处创建的值 f1 的类型为 String => Unit.

您需要调用此函数才能实际执行 println,

scala> f1.apply("dsfsd")
dsfsd

类似地,someSource.to(Sink.foreach(println)) 将创建一个类型为 RunnableGraph 的值,因此 scala 控制台将打印类似 val res0: RunnableGraph....

的内容

您现在需要 运行 这个图表来实际执行它。

但与前面的函数示例相比,图形的执行是在线程池上异步发生的,这意味着它可能无法在某些版本的 Scala 控制台或工作集上运行(取决于线程池生命周期的管理方式) .所以,如果你这样做,

scala> val someSource = Source.single(ByteString("SomeValue"))
val someSource: akka.stream.scaladsl.Source[akka.util.ByteString,akka.NotUsed] = Source(SourceShape(single.out(369296388)))

scala> val runnableGraph = someSource.to(Sink.foreach(println))
val runnableGraph: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = RunnableGraph

scala> runnableGraph.run()

如果有效,您将看到以下内容,

scala> runnableGraph.run()
val res0: akka.NotUsed = NotUsed
ByteString(83, 111, 109, 101, 86, 97, 108, 117, 101)

但您可能会看到一些与控制台相关的错误,由于某些原因无法完成图表 运行。

您实际上需要具体化 Sink,这将在 运行 图表上生成 Future[Done]。然后你将不得不使用 Await.

等待那个 Future[Done]

您必须将所有这些放入普通的 Scala 文件中并作为 Scala 应用程序执行。

import akka.{Done, actor}
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.util.ByteString

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}

object TestAkkaStream extends App {

  val actorSystem = ActorSystem(Behaviors.empty, "test-stream-system")

  implicit val classicActorSystem = actorSystem.classicSystem

  val someSource = Source.single(ByteString("SomeValue"))

  val runnableGraph = someSource.toMat(Sink.foreach(println))(Keep.right)

  val graphRunDoneFuture: Future[Done] = runnableGraph.run()

  Await.result(graphRunDoneFuture, Duration.Inf)
}