专门处理Akka流的第一个元素

Handle Akka stream's first element specially

是否有一种惯用的方式以特殊方式处理 Akka 流的 Source 第一个元素?我现在拥有的是:

    var firstHandled = false
    source.map { elem =>
      if(!firstHandled) {
        //handle specially
        firstHandled = true
      } else {
        //handle normally
      }
    }

谢谢

使用 zipWith

您可以使用仅 returns true 一次的布尔值源压缩原始 Source。然后可以处理这个压缩的源代码。

首先我们需要一个发出布尔值的源:

//true, false, false, false, ...
def firstTrueIterator() : Iterator[Boolean] = 
  (Iterator single true) ++ (Iterator continually false)

def firstTrueSource : Source[Boolean, _] = 
  Source fromIterator firstTrueIterator

然后我们可以定义一个函数来处理两种不同的情况:

type Data = ???
type OutputData = ???

def processData(data : Data, firstRun : Boolean) : OutputData = 
  if(firstRun) { ... }
  else { ... }

然后可以在 zipWith 原始来源中使用此函数:

val originalSource : Source[Data,_] = ???    

val contingentSource : Source[OutputData,_] =
  originalSource.zipWith(firstTrueSource)(processData)

使用状态流

您可以创建一个 Flow,其中包含类似于问题中示例的状态,但使用更实用的方法:

def firstRunner(firstCall : (Data) => OutputData,
                otherCalls : (Data) => OutputData) : (Data) => OutputData = {
  var firstRun = true
  (data : Data) => {
    if(firstRun) {
      firstRun = false
      firstCall(data)
    }
    else
      otherCalls(data)
  }
}//end def firstRunner

def firstRunFlow(firstCall :  (Data) => OutputData, 
                 otherCalls : (Data) => OutputData) : Flow[Data, OutputData, _] = 
  Flow[Data] map firstRunner(firstCall, otherCalls)

然后可以将此流程应用于您的原始来源:

def firstElementFunc(data : Data) : OutputData = ???
def remainingElsFunc(data : Data) : OutputData = ???

val firstSource : Source[OutputData, _] = 
  originalSource via firstRunFlow(firstElementFunc,remainingElseFunc)

"Idiomatic Way"

直接回答您的问题需要听写 "idiomatic way"。我最后回答那部分是因为它是编译器最不能验证的,因此更接近意见。我永远不会声称自己是惯用代码的有效分类器。

我对 akka-streams 的个人经验是,最好将我的观点转换为想象一个包含 Data 元素的实际流(我想到一辆有厢式车厢的火车)。我需要把它分成多个固定大小的火车吗?只有某些货车才能通过吗?我可以并排安装另一列包含 Boolean 辆可以发出前方信号的火车吗?由于我对流(火车)的看法,我更喜欢 zipWith 方法。我最初的方法总是使用连接在一起的其他流部分。

此外,我发现最好在 akka Stream 组件中嵌入尽可能少的代码。 firstTrueIteratorprocessData 完全不依赖 akka。同时,firstTrueSourcecontingentSource 定义几乎没有逻辑。这允许您独立于笨重的 ActorSystem 测试逻辑,并且可以在 Futures 或 Actors 中使用胆量。

虽然我通常会接受 Ramon 的回答,但您也可以使用 prefixAndTail, with a prefix of 1, together with flatMapConcat 来实现类似的效果:

val src = Source(List(1, 2, 3, 4, 5))
val fst = Flow[Int].map(i => s"First: $i")
val rst = Flow[Int].map(i => s"Rest:  $i")

val together = src.prefixAndTail(1).flatMapConcat { case (head, tail) =>
  // `head` is a Seq of the prefix elements, which in our case is
  // just the first one. We can convert it to a source of just
  // the first element, processed via our fst flow, and then
  // concatenate `tail`, which is the remainder...
  Source(head).via(fst).concat(tail.via(rst))
}

Await.result(together.runForeach(println), 10.seconds)
// First: 1
// Rest:  2
// Rest:  3
// Rest:  4
// Rest:  5

这当然不仅适用于第一个项目,而且适用于前 N 个项目,条件是这些项目将被作为一个严格的集合。

虽然我更喜欢使用 zip 的方法,但也可以使用 statefulMapConcat:

source
  .statefulMapConcat { _ =>
        var firstRun = true
        elem => {
          if (firstRun) {
            //first
            firstRun = false
          } else {
            //not first            
          }
        }
      }

您可以使用 prepend 将源添加到流程中。只需将单个项目源添加到流程中,在它耗尽后,原始源的其余部分将继续。

https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/prepend.html

 Source(List(1, 2, 3))
  .prepend(Source.single(0))
  .runWith(Sink.foreach(println))

0 1 2 3