akka流中的流问题

Flow problems in akka stream

我是 Akka 流的新手,我使用 Rx 有一段时间了,所以我非常了解所有运算符,但我不知道为什么我的管道不发出值

这是我的代码

  @Test def mainFlow(): Unit = {
    val increase = Flow[Int]
      .map(value => value * 10)
    val filterFlow = Flow[Int]
      .filter(value => value > 50)
      .take(2)
    Source(0 to 10)
      .via(increase)
      .via(filterFlow)
      .to(Sink.foreach(value => println(s"Item emitted:$value")))
      .run()
  }

第一个 Flow 转换 Source 中发出的值乘以 10,第二个 Flow 过滤器仅获取高于 50 的项目然后我只得到 2,所以我期望在 Sink 中有 60 和70 但它没有发出任何东西。

知道为什么吗?

您的流程已正确构建,并发出您提到的那 2 个元素。 我相信问题出在你的测试上。也就是说,流程 运行 是异步的,您的测试是一个普通的 Unit 过程。因此,测试不会等到流量为运行.

您需要在测试中引入一些同步来执行您的断言。一种方法是使用 ScalaTest 中的 ScalaFutures 特征,它为您提供 futureValue 方法。

val increase = Flow[Int]
  .map(value => value * 10)
val filterFlow = Flow[Int]
  .filter(value => value > 50)
  .take(2)
Source(0 to 10)
  .via(increase)
  .via(filterFlow)
  .runForeach(value => println(s"Item emitted:$value"))
  .futureValue

请注意 .to(Sink.foreach{...}).run() 不会公开您需要同步的 Future[Done]。您的代码需要更改为 .toMat(Sink.foreach{...})(Keep.right).run(),可以缩写为 .runForeach(...)

因为你说的是​​:

对于数字 1..10,将它们乘以 10,但只产生前 2 个元素,然后保留所有大于 50 的元素,然后打印它们。

此外,您的测试不会等待 RunnableFlow 完成,这通常意味着您的程序将在流有机会 运行(Akka Streams 运行 异步)之前退出。

请注意,对于您的示例,没有理由使用 GraphDSL,您的代码与:

Source(1 to 10).map(_ * 10).take(2).filter(_ > 50).runForeach(println)

但是因为它并没有真正做任何事情"meaningfully async"我认为你会更好:

(1 to 10).map(_ * 10).take(2).filter(_ > 50).foreach(println)

但话又说回来,根据代码的当前状态,它等同于以下表达式:

()