将操作应用于所有先前发出的项目
Applying an operation to all previously emitted items
我有一个项目队列,一旦服务器可访问就会发送这些项目:
val queueHistory: Observable<QueuedItem>
队列项是:
data class QueuedItem(val item: Item, val sent: Boolean = false)
queueHistory 永远不会完成,它只记录项目何时排队等待发送 onNext(QueuedItem(item1, false)
,然后稍后记录它已发送 onNext(QueuedItem(item1, true)
。
我想做的是计算当前有多少未发送的项目。
我遇到麻烦的主要原因是列表未完成,我最初考虑使用 collect
,但需要完整的列表。
我正在尝试将 scan
与
之类的东西一起使用
queueHistory.scan { items: ScannedItems, item -> ScannedItems(arrayOf(*items, item), 0) }
到目前为止,我可以保留当前遇到的项目列表,但扫描希望所有内容都是同一类型。
我的另一个想法是
queueHistory
.groupBy { it.item }
.flatMapSingle { it.toList() }
.map { it.size % 2 }
但是 toList() 需要一个有限列表。
如有任何想法,我们将不胜感激!
有一个 #scan
重载,它接受一个种子值和一个 lambda,它接受两个参数(prev,current)。 prev
参数与seed同类型,current
参数与upstream同类型
示例
class So65587608 {
@Test
fun `65587608`() {
val producer = PublishSubject.create<QueuedItem>()
val map = producer.scan(mutableListOf<QueuedItem>(), { list, curr ->
// when send = true -> remove
if (curr.sent) {
list.removeIf { it.item == curr.item }
} else if (!list.any { it.item == curr.item }) {
list.add(curr)
}
list.toMutableList()
}).map { list -> list as List<QueuedItem> }
map.subscribe {
println(it)
}
val test = map.test()
producer.onNext(QueuedItem("1"))
producer.onNext(QueuedItem("1", true))
producer.onNext(QueuedItem("2", true))
producer.onNext(QueuedItem("3", true))
producer.onNext(QueuedItem("4"))
producer.onNext(QueuedItem("5"))
producer.onNext(QueuedItem("4", true))
}
data class QueuedItem(val item: String, val sent: Boolean = false)
}
输出
[] // seed value
[QueuedItem(item=1, sent=false)]
[]
[]
[]
[QueuedItem(item=4, sent=false)]
[QueuedItem(item=4, sent=false), QueuedItem(item=5, sent=false)]
[QueuedItem(item=5, sent=false)]
备注
您必须在每次 #scan 迭代时复制列表或使用来自持久数据集合的不可变列表。
此外,这可能不是一个好的实现,因为列表是未绑定的,这可能会占用您所有的内存。如果列表足够大,线性搜索也可能需要一些时间,这可能不太好。人们应该考虑如何更好地完成循环。
我有一个项目队列,一旦服务器可访问就会发送这些项目:
val queueHistory: Observable<QueuedItem>
队列项是:
data class QueuedItem(val item: Item, val sent: Boolean = false)
queueHistory 永远不会完成,它只记录项目何时排队等待发送 onNext(QueuedItem(item1, false)
,然后稍后记录它已发送 onNext(QueuedItem(item1, true)
。
我想做的是计算当前有多少未发送的项目。
我遇到麻烦的主要原因是列表未完成,我最初考虑使用 collect
,但需要完整的列表。
我正在尝试将 scan
与
之类的东西一起使用
queueHistory.scan { items: ScannedItems, item -> ScannedItems(arrayOf(*items, item), 0) }
到目前为止,我可以保留当前遇到的项目列表,但扫描希望所有内容都是同一类型。
我的另一个想法是
queueHistory
.groupBy { it.item }
.flatMapSingle { it.toList() }
.map { it.size % 2 }
但是 toList() 需要一个有限列表。
如有任何想法,我们将不胜感激!
有一个 #scan
重载,它接受一个种子值和一个 lambda,它接受两个参数(prev,current)。 prev
参数与seed同类型,current
参数与upstream同类型
示例
class So65587608 {
@Test
fun `65587608`() {
val producer = PublishSubject.create<QueuedItem>()
val map = producer.scan(mutableListOf<QueuedItem>(), { list, curr ->
// when send = true -> remove
if (curr.sent) {
list.removeIf { it.item == curr.item }
} else if (!list.any { it.item == curr.item }) {
list.add(curr)
}
list.toMutableList()
}).map { list -> list as List<QueuedItem> }
map.subscribe {
println(it)
}
val test = map.test()
producer.onNext(QueuedItem("1"))
producer.onNext(QueuedItem("1", true))
producer.onNext(QueuedItem("2", true))
producer.onNext(QueuedItem("3", true))
producer.onNext(QueuedItem("4"))
producer.onNext(QueuedItem("5"))
producer.onNext(QueuedItem("4", true))
}
data class QueuedItem(val item: String, val sent: Boolean = false)
}
输出
[] // seed value
[QueuedItem(item=1, sent=false)]
[]
[]
[]
[QueuedItem(item=4, sent=false)]
[QueuedItem(item=4, sent=false), QueuedItem(item=5, sent=false)]
[QueuedItem(item=5, sent=false)]
备注
您必须在每次 #scan 迭代时复制列表或使用来自持久数据集合的不可变列表。
此外,这可能不是一个好的实现,因为列表是未绑定的,这可能会占用您所有的内存。如果列表足够大,线性搜索也可能需要一些时间,这可能不太好。人们应该考虑如何更好地完成循环。