RxJava/RxKotlin 根据子类型拆分流
RxJava/RxKotlin split stream depending on subtype
我有一个 ResponseMessage
流,可以是不同的子类型。我想将流拆分成多个流,以便我可以在其自己的流中处理每种类型。
我的第一次尝试结果是这样的,但我看不出结果。
file.readLines()
.toObservable()
.map { mapper.readValue(it, ResponseMessage::class.java) }
.groupBy { when(it) {
is MarketChangeMessage -> it::class
else -> it::class
}}
.map { it.????? } //How can possible this work?
我现在的问题是:
将流分成一种特定子类型的流的惯用方法是什么?
您可以使用 ofType
运算符:
ofType( ) — emit only those items from the source Observable that are of a particular class.
示例:
val messages = file.readLines()
.toObservable()
.map { mapper.readValue(it, ResponseMessage::class.java) }
.share() // <-- or other multicasting operator
messages
.ofType(MarketChangeMessage::class)
.subscribe()
messages
.ofType(Other::class)
.subscribe()
我有一个 ResponseMessage
流,可以是不同的子类型。我想将流拆分成多个流,以便我可以在其自己的流中处理每种类型。
我的第一次尝试结果是这样的,但我看不出结果。
file.readLines()
.toObservable()
.map { mapper.readValue(it, ResponseMessage::class.java) }
.groupBy { when(it) {
is MarketChangeMessage -> it::class
else -> it::class
}}
.map { it.????? } //How can possible this work?
我现在的问题是: 将流分成一种特定子类型的流的惯用方法是什么?
您可以使用 ofType
运算符:
ofType( ) — emit only those items from the source Observable that are of a particular class.
示例:
val messages = file.readLines()
.toObservable()
.map { mapper.readValue(it, ResponseMessage::class.java) }
.share() // <-- or other multicasting operator
messages
.ofType(MarketChangeMessage::class)
.subscribe()
messages
.ofType(Other::class)
.subscribe()