与 Akka Streams 同步反馈

Synchronized Feedback with Akka Streams

我想要实现的是用 akka 流实现类似同步反馈循环的东西。

假设您有 Flow[Int].filter(_ % 5 == 0)。当您将 Int 的流广播到此流并将元组直接压缩到它后面时,您会得到类似

的内容
(0,0)
(5,1)
(10,2)

有没有办法发出一个 Option[Int],它表示流是否在我推送下一个元素后发出了一个元素?

(Some(0),0)
(None, 1)
(None, 2)
(None, 3)
(None, 4)
(Some(5), 5)
(None, 6)
...

我想过实现我自己的DetachedStage的在Flow的前后保持一个状态,每当流量拉上前的舞台,我就知道他需要下一个元素.当后面的stage没有收到元素时,就是None。

遗憾的是,结果并不好,差了很多位置。

边注

过滤器流只是一个例子,它可能是一个很长的流,我无法提供在其中的每个阶段发出 Option 的能力,所以我真的必须知道,流是推送下一个还是没有从下游请求下一个

我也玩过conflateexpand,但是这些我们在结果的位置偏移方面更糟

我在配置中更改的一件事是流的 initialmax 缓冲区,因此我可以确定指示的需求确实是在我推送的元素之后。

如果能得到一些关于如何解决这个问题的建议,那就太好了!

我无法提供您想要的内容。但我可以骗取你正在寻找的 Future,例如:

(Future(Some(0)), 0)
(Future(None)   , 1)
(Future(None)   , 2)
...

扩展您的示例,如果给定一个无法更改的流程:

val flow = Flow[Int].filter(_ % 5 == 0)

然后可以在单个输入上评估此流程,并将结果转换为 Option:

import scala.concurrent.{Future, Promise}
import akka.stream.{Materializer, ActorMaterializer}
import akka.stream.scaladsl.{Source,Sink}

def evalFlow(in : Int, flow : Flow[Int, Int, _])(implicit mat : Materializer, ec : ExecutionContext) = {
  val fut : Future[Int] = 
    Source.single(in)
          .via(flow)
          .runWith(Sink.head) //Throws an Exception if filter fails

  fut.map(Some(_))              //       val => Some(val)
     .fallbackTo(Promise(None)) // Exception => None
} 

这个函数returns一个Future[Option[Int]]。然后我们可以使用评估来简单地将结果与输入结合起来:

def evalAndCombine(flow : Flow[Int, Int, _])(in : Int)(implicit mat : Materializer, ec : ExecutionContext) =
  (evalFlow(in, flow), in)//(Future[Option[Int]], Int)

最后,evalAndCombine 函数可以放在你的整数源之后:

import akka.actor.ActorSystem

implicit val actorSystem = ActorSystem()
implicit val mat = ActorMaterializer()
import actorSystem.dispatcher

val exampleSource = Source(() => (1 to 6).toIterator)

val tupleSource = exampleSource map evalAndCombine(flow)

同样,如果您想要 Future[(Option[Int], Int)] 而不是 (Future[Option[Int]], Int),例如:

Future[(Some(0), 0)]
Future[(None   , 1)]
...

然后稍微修改combine函数:

def evalAndCombine(flow : Flow[Int, Int, _])(in : Int)(implicit mat : Materializer, ec : ExecutionContext) =
  evalFlow(in, flow) map (option => (option, in))//Future[(Option[Int], Int)]