Scala Akka 流合并过滤器和映射

Scala Akka Streams Merging Filter And Map

我想知道是否有任何方法可以优化以下 Scala 代码,因为它看起来效率不高。 基本上,我只想从流中删除任何不是 Tweet 的对象并将其映射到 Tweet 而不是 Any.

val tweetsFlow = Flow[Any].filter({
  case _: Tweet => true
  case _ => false
}).map({
  case tweet: Tweet => tweet
})

你可以使用collect方法,有些像这样

val tws = Vector(
  "foo",
  Tweet(Author("foo"), tms, "foo #akka bar"),
  1000L,
  Tweet(Author("bar"), tms, "foo #spray bar"),
  Tweet(Author("baz"), tms, "foo bar"),
  1
)

val tflow = Flow[Any].collect {
  case x: Tweet => x
}
Source(tws).via(tflow)

由于您只对特定类型的过滤感兴趣,您可以使用 collectType:

// case class Tweet(title: String)
// val mixedSource: Source[Any, NotUsed] = Source(List(Tweet("1"), 3, Tweet("2")))
val tweetsSource: Source[Tweet, NotUsed] = mixedSource.collectType[Tweet]
// tweetsSource.runForeach(println)
// Tweet("1")
// Tweet("2")