Akka 流 - 丢弃一条消息
Akka streams - throw away a message
我的流程如下所示:
case class T1 extends A
case class T2 extends A
case class T3 extends A
val proc: Flow[A, B, Unit] = Flow[A] .map {
case x: T1 => B()
case x: T2 => B()
case x: T3 =>
save_message(x)
// now throw away the message
}
.transform(() => lastStage)
val lastStage = ...
def save_message(msg: A): Unit = ...
所以在这段代码中,如果它是派生类型 T1 或 T2,我会收到一条基本类型 A 的消息。但是,如果它是 T3 类型,我只想把它扔掉,而不是将它传递给 lastStage,继续处理下一条消息。我该怎么做?
使用collect
代替map
:
case class T1 extends A
case class T2 extends A
case class T3 extends A
val proc: Flow[A, B, Unit] = Flow[A].collect {
case x: T1 => B()
case x: T2 => B()
}
.transform(() => lastStage)
val lastStage = ...
我的流程如下所示:
case class T1 extends A
case class T2 extends A
case class T3 extends A
val proc: Flow[A, B, Unit] = Flow[A] .map {
case x: T1 => B()
case x: T2 => B()
case x: T3 =>
save_message(x)
// now throw away the message
}
.transform(() => lastStage)
val lastStage = ...
def save_message(msg: A): Unit = ...
所以在这段代码中,如果它是派生类型 T1 或 T2,我会收到一条基本类型 A 的消息。但是,如果它是 T3 类型,我只想把它扔掉,而不是将它传递给 lastStage,继续处理下一条消息。我该怎么做?
使用collect
代替map
:
case class T1 extends A
case class T2 extends A
case class T3 extends A
val proc: Flow[A, B, Unit] = Flow[A].collect {
case x: T1 => B()
case x: T2 => B()
}
.transform(() => lastStage)
val lastStage = ...