RxJava/RxKotlin 根据子类型拆分流

RxJava/RxKotlin split stream depending on subtype

我有一个 ResponseMessage 流,可以是不同的子类型。我想将流拆分成多个流,以便我可以在其自己的流中处理每种类型。

我的第一次尝试结果是这样的,但我看不出结果。

file.readLines()
        .toObservable()
        .map { mapper.readValue(it, ResponseMessage::class.java) }
        .groupBy { when(it) {
            is MarketChangeMessage -> it::class
            else -> it::class
        }}
        .map { it.????? } //How can possible this work?

我现在的问题是: 将流分成一种特定子类型的流的惯用方法是什么?

您可以使用 ofType 运算符:

ofType( ) — emit only those items from the source Observable that are of a particular class.

示例:

val messages = file.readLines()
    .toObservable()
    .map { mapper.readValue(it, ResponseMessage::class.java) }
    .share() // <-- or other multicasting operator

messages
    .ofType(MarketChangeMessage::class)
    .subscribe()

messages
    .ofType(Other::class)
    .subscribe()