Akka 流 - 丢弃一条消息

Akka streams - throw away a message

我的流程如下所示:

case class T1 extends A 
case class T2 extends A
case class T3 extends A

val proc: Flow[A, B, Unit] =   Flow[A] .map {
         case x: T1 => B()
         case x: T2 => B()
         case x: T3 => 
               save_message(x)
               // now throw away the message
 }
 .transform(() => lastStage)

 val lastStage = ...
 def save_message(msg: A): Unit = ...

所以在这段代码中,如果它是派生类型 T1 或 T2,我会收到一条基本类型 A 的消息。但是,如果它是 T3 类型,我只想把它扔掉,而不是将它传递给 lastStage,继续处理下一条消息。我该怎么做?

使用collect代替map:

case class T1 extends A 
case class T2 extends A
case class T3 extends A

val proc: Flow[A, B, Unit] =   Flow[A].collect {
         case x: T1 => B()
         case x: T2 => B()
 }
 .transform(() => lastStage)

 val lastStage = ...