Akka 流 - 如果条件为真则丢弃消息
Akka stream - drop message if condition is true
在这个例子中,我有一个 Ticker 实例流,它有一个序列属性。
我想删除所有序列号低于前一条的消息。
我可以做类似下面的事情,但它很难看。有更简单的方法吗?这个图案有名字吗?
source
.scan(TickerInOrder())((state, ticker) => TickerInOrder(state, ticker))
.collect { case TickerInOrder(Some(ticker), Some(inOrder)) if inOrder => ticker }
// ~~~~~~~~
object TickerInOrder {
def apply(state: TickerInOrder, ticker: Ticker): TickerInOrder = {
val inOrder = state.ticker match {
case Some(prev) => ticker.sequence > prev.sequence
case None => true
}
TickerInOrder(Some(ticker), Some(inOrder))
}
}
case class TickerInOrder(ticker: Option[Ticker] = None, inOrder: Option[Boolean] = None)
您可以使用 statefulMapConcat
,请参阅文档 https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/statefulMapConcat.html
import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
object Stateful {
def main(args: Array[String]): Unit = {
implicit val system: ActorSystem = ActorSystem("Stateful")
Source(List(1,3,2,4,5,6,0,7)).statefulMapConcat {
() =>
var prev = 0L
element =>
if (element > prev) {
prev = element
element :: Nil
} else {
prev = element
Nil
}
}.runForeach(println)
// 1 3 4 5 6 7
}
}
更改代码以使用 Ticker 和序列很简单。
在这个例子中,我有一个 Ticker 实例流,它有一个序列属性。
我想删除所有序列号低于前一条的消息。
我可以做类似下面的事情,但它很难看。有更简单的方法吗?这个图案有名字吗?
source
.scan(TickerInOrder())((state, ticker) => TickerInOrder(state, ticker))
.collect { case TickerInOrder(Some(ticker), Some(inOrder)) if inOrder => ticker }
// ~~~~~~~~
object TickerInOrder {
def apply(state: TickerInOrder, ticker: Ticker): TickerInOrder = {
val inOrder = state.ticker match {
case Some(prev) => ticker.sequence > prev.sequence
case None => true
}
TickerInOrder(Some(ticker), Some(inOrder))
}
}
case class TickerInOrder(ticker: Option[Ticker] = None, inOrder: Option[Boolean] = None)
您可以使用 statefulMapConcat
,请参阅文档 https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/statefulMapConcat.html
import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
object Stateful {
def main(args: Array[String]): Unit = {
implicit val system: ActorSystem = ActorSystem("Stateful")
Source(List(1,3,2,4,5,6,0,7)).statefulMapConcat {
() =>
var prev = 0L
element =>
if (element > prev) {
prev = element
element :: Nil
} else {
prev = element
Nil
}
}.runForeach(println)
// 1 3 4 5 6 7
}
}
更改代码以使用 Ticker 和序列很简单。