专门处理Akka流的第一个元素
Handle Akka stream's first element specially
是否有一种惯用的方式以特殊方式处理 Akka 流的 Source
第一个元素?我现在拥有的是:
var firstHandled = false
source.map { elem =>
if(!firstHandled) {
//handle specially
firstHandled = true
} else {
//handle normally
}
}
谢谢
使用 zipWith
您可以使用仅 returns true
一次的布尔值源压缩原始 Source
。然后可以处理这个压缩的源代码。
首先我们需要一个发出布尔值的源:
//true, false, false, false, ...
def firstTrueIterator() : Iterator[Boolean] =
(Iterator single true) ++ (Iterator continually false)
def firstTrueSource : Source[Boolean, _] =
Source fromIterator firstTrueIterator
然后我们可以定义一个函数来处理两种不同的情况:
type Data = ???
type OutputData = ???
def processData(data : Data, firstRun : Boolean) : OutputData =
if(firstRun) { ... }
else { ... }
然后可以在 zipWith
原始来源中使用此函数:
val originalSource : Source[Data,_] = ???
val contingentSource : Source[OutputData,_] =
originalSource.zipWith(firstTrueSource)(processData)
使用状态流
您可以创建一个 Flow
,其中包含类似于问题中示例的状态,但使用更实用的方法:
def firstRunner(firstCall : (Data) => OutputData,
otherCalls : (Data) => OutputData) : (Data) => OutputData = {
var firstRun = true
(data : Data) => {
if(firstRun) {
firstRun = false
firstCall(data)
}
else
otherCalls(data)
}
}//end def firstRunner
def firstRunFlow(firstCall : (Data) => OutputData,
otherCalls : (Data) => OutputData) : Flow[Data, OutputData, _] =
Flow[Data] map firstRunner(firstCall, otherCalls)
然后可以将此流程应用于您的原始来源:
def firstElementFunc(data : Data) : OutputData = ???
def remainingElsFunc(data : Data) : OutputData = ???
val firstSource : Source[OutputData, _] =
originalSource via firstRunFlow(firstElementFunc,remainingElseFunc)
"Idiomatic Way"
直接回答您的问题需要听写 "idiomatic way"。我最后回答那部分是因为它是编译器最不能验证的,因此更接近意见。我永远不会声称自己是惯用代码的有效分类器。
我对 akka-streams 的个人经验是,最好将我的观点转换为想象一个包含 Data
元素的实际流(我想到一辆有厢式车厢的火车)。我需要把它分成多个固定大小的火车吗?只有某些货车才能通过吗?我可以并排安装另一列包含 Boolean
辆可以发出前方信号的火车吗?由于我对流(火车)的看法,我更喜欢 zipWith 方法。我最初的方法总是使用连接在一起的其他流部分。
此外,我发现最好在 akka Stream 组件中嵌入尽可能少的代码。 firstTrueIterator
和 processData
完全不依赖 akka。同时,firstTrueSource
和 contingentSource
定义几乎没有逻辑。这允许您独立于笨重的 ActorSystem 测试逻辑,并且可以在 Futures 或 Actors 中使用胆量。
虽然我通常会接受 Ramon 的回答,但您也可以使用 prefixAndTail
, with a prefix of 1, together with flatMapConcat
来实现类似的效果:
val src = Source(List(1, 2, 3, 4, 5))
val fst = Flow[Int].map(i => s"First: $i")
val rst = Flow[Int].map(i => s"Rest: $i")
val together = src.prefixAndTail(1).flatMapConcat { case (head, tail) =>
// `head` is a Seq of the prefix elements, which in our case is
// just the first one. We can convert it to a source of just
// the first element, processed via our fst flow, and then
// concatenate `tail`, which is the remainder...
Source(head).via(fst).concat(tail.via(rst))
}
Await.result(together.runForeach(println), 10.seconds)
// First: 1
// Rest: 2
// Rest: 3
// Rest: 4
// Rest: 5
这当然不仅适用于第一个项目,而且适用于前 N 个项目,条件是这些项目将被作为一个严格的集合。
虽然我更喜欢使用 zip 的方法,但也可以使用 statefulMapConcat
:
source
.statefulMapConcat { _ =>
var firstRun = true
elem => {
if (firstRun) {
//first
firstRun = false
} else {
//not first
}
}
}
您可以使用 prepend
将源添加到流程中。只需将单个项目源添加到流程中,在它耗尽后,原始源的其余部分将继续。
https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/prepend.html
Source(List(1, 2, 3))
.prepend(Source.single(0))
.runWith(Sink.foreach(println))
0
1
2
3
是否有一种惯用的方式以特殊方式处理 Akka 流的 Source
第一个元素?我现在拥有的是:
var firstHandled = false
source.map { elem =>
if(!firstHandled) {
//handle specially
firstHandled = true
} else {
//handle normally
}
}
谢谢
使用 zipWith
您可以使用仅 returns true
一次的布尔值源压缩原始 Source
。然后可以处理这个压缩的源代码。
首先我们需要一个发出布尔值的源:
//true, false, false, false, ...
def firstTrueIterator() : Iterator[Boolean] =
(Iterator single true) ++ (Iterator continually false)
def firstTrueSource : Source[Boolean, _] =
Source fromIterator firstTrueIterator
然后我们可以定义一个函数来处理两种不同的情况:
type Data = ???
type OutputData = ???
def processData(data : Data, firstRun : Boolean) : OutputData =
if(firstRun) { ... }
else { ... }
然后可以在 zipWith
原始来源中使用此函数:
val originalSource : Source[Data,_] = ???
val contingentSource : Source[OutputData,_] =
originalSource.zipWith(firstTrueSource)(processData)
使用状态流
您可以创建一个 Flow
,其中包含类似于问题中示例的状态,但使用更实用的方法:
def firstRunner(firstCall : (Data) => OutputData,
otherCalls : (Data) => OutputData) : (Data) => OutputData = {
var firstRun = true
(data : Data) => {
if(firstRun) {
firstRun = false
firstCall(data)
}
else
otherCalls(data)
}
}//end def firstRunner
def firstRunFlow(firstCall : (Data) => OutputData,
otherCalls : (Data) => OutputData) : Flow[Data, OutputData, _] =
Flow[Data] map firstRunner(firstCall, otherCalls)
然后可以将此流程应用于您的原始来源:
def firstElementFunc(data : Data) : OutputData = ???
def remainingElsFunc(data : Data) : OutputData = ???
val firstSource : Source[OutputData, _] =
originalSource via firstRunFlow(firstElementFunc,remainingElseFunc)
"Idiomatic Way"
直接回答您的问题需要听写 "idiomatic way"。我最后回答那部分是因为它是编译器最不能验证的,因此更接近意见。我永远不会声称自己是惯用代码的有效分类器。
我对 akka-streams 的个人经验是,最好将我的观点转换为想象一个包含 Data
元素的实际流(我想到一辆有厢式车厢的火车)。我需要把它分成多个固定大小的火车吗?只有某些货车才能通过吗?我可以并排安装另一列包含 Boolean
辆可以发出前方信号的火车吗?由于我对流(火车)的看法,我更喜欢 zipWith 方法。我最初的方法总是使用连接在一起的其他流部分。
此外,我发现最好在 akka Stream 组件中嵌入尽可能少的代码。 firstTrueIterator
和 processData
完全不依赖 akka。同时,firstTrueSource
和 contingentSource
定义几乎没有逻辑。这允许您独立于笨重的 ActorSystem 测试逻辑,并且可以在 Futures 或 Actors 中使用胆量。
虽然我通常会接受 Ramon 的回答,但您也可以使用 prefixAndTail
, with a prefix of 1, together with flatMapConcat
来实现类似的效果:
val src = Source(List(1, 2, 3, 4, 5))
val fst = Flow[Int].map(i => s"First: $i")
val rst = Flow[Int].map(i => s"Rest: $i")
val together = src.prefixAndTail(1).flatMapConcat { case (head, tail) =>
// `head` is a Seq of the prefix elements, which in our case is
// just the first one. We can convert it to a source of just
// the first element, processed via our fst flow, and then
// concatenate `tail`, which is the remainder...
Source(head).via(fst).concat(tail.via(rst))
}
Await.result(together.runForeach(println), 10.seconds)
// First: 1
// Rest: 2
// Rest: 3
// Rest: 4
// Rest: 5
这当然不仅适用于第一个项目,而且适用于前 N 个项目,条件是这些项目将被作为一个严格的集合。
虽然我更喜欢使用 zip 的方法,但也可以使用 statefulMapConcat
:
source
.statefulMapConcat { _ =>
var firstRun = true
elem => {
if (firstRun) {
//first
firstRun = false
} else {
//not first
}
}
}
您可以使用 prepend
将源添加到流程中。只需将单个项目源添加到流程中,在它耗尽后,原始源的其余部分将继续。
https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/prepend.html
Source(List(1, 2, 3))
.prepend(Source.single(0))
.runWith(Sink.foreach(println))
0
1
2
3