如何使用关心下一个元素的谓词来限制 Kotlin 序列?

How to limit a Kotlin sequence with a predicate caring about the next element?

我需要处理 API 的 returns 分块数据。

val response = server.getChunk(request, chunkNumber)

data class PageInfo(
    val pageNumber: Int,
    val maxPages: Int
)
data class Response(
    val elements: List<Payload>,
    val pageInfo: PageInfo
)

我假设这个 API 是幂等的。

我可以做类似的事情(取 1):

var i = 0
var maxPages: Int
val payloadTotal = mutableListOf<Payload>()
do {
    val response = server.getChunk(request, i++)
    maxPages = response.pageInfo.maxPages
    payloadTotal.addAll(response.payload)
} while (i < maxPages)
    

可以用,但是很丑。

取2:

val partners = IntStream
    .iterate(0) { it + 1 }
    .asSequence()
    .map { server.getChunk(request, it) }
    .takeWhile { it.pageInfo.pageNumber < it.pageInfo.maxPages }
    .map { it.payload }
    .flatten()
    .toList()

它看起来好多了,但并没有完全按预期工作:因为我们在已经请求服务器之后检查 takeWhile 谓词,所以我们总是会做一个额外的请求。由于我们在执行第一个请求之前不知道 maxPages 值,因此我们不能使用 take(n: Int).

之类的东西

所以我最终使用了那个代码(取 3):

var maxPages = Int.MAX_VALUE
val partners = IntStream
    .iterate(0) { it + 1 }
    .asSequence()
    .takeWhile { it < maxPages }
    .map {
        server.getChunk(request, it)
            .also { response -> maxPages = response.pageInfo.maxPages }
    }
    .map { it.payload }
    .flatten()
    .toList()
        

有效。但是话又说回来,我们使用了这个我不喜欢的额外状态。

此外,由于“有效最终”限制,这在 Java 中根本不起作用(我希望它能作为奖励)。

那么如何使用函数式方法实现所需的行为呢?有没有办法在不跟踪管道外的任何添加状态的情况下做到这一点?

*如果存在这样的方式,它是否也转换为Java?

我们可以为此创建自己的序列运算符。它需要编写一些代码,但是我们可以很好地使用它:

generateSequence(0) { it + 1 }
    .map { server.getChunk(request, it) }
    .takeNextWhile { it.pageInfo.pageNumber + 1 < it.pageInfo.maxPages }
    .flatMap { it.elements }
    .toList()

fun <T> Sequence<T>.takeNextWhile(predicate: (T) -> Boolean): Sequence<T> = Sequence {
    val iter = iterator()
    var hasNext = iter.hasNext()
    object : Iterator<T> {
        override fun hasNext() = hasNext
        override fun next() = iter.next().also { hasNext = iter.hasNext() && predicate(it) }
    }
}

或者,如果对我们来说更有意义,我们可以将其命名为 takeWhilePrevious()

如果我们不想创建新的运算符,那么我认为最好不要使用函数转换,因为在这种情况下它们会降低代码的可读性。假设maxPages不变,我们可以先获取,然后执行一个很简单的循环:

val first = server.getChunk(request, 0)
val payloads = first.elements.toMutableList()
(1 until first.pageInfo.maxPages).forEach {
    payloads += server.getChunk(request, it).elements
}

使其工作的小解决方法,您可以将空响应对象作为种子传递给 generateSequence

val partners = generateSequence(0 to (null as Response?)) { (page, _) ->
    page+1 to getChunk(page)
}.takeWhile { (page, response) ->
    response == null || page < response.maxPages
}.flatMap {
    it.second?.payload ?: emptyList()
}.toList()

(虽然可读性目标仍未解决。)可以使用函数引入一些清晰度

data class PageResponse(val page: Int = 0, val response: Response? = null) {
  fun isNotLastPage() = response == null || page < response.maxPages

  fun getPayload() = response?.payload ?: emptyList()
}

val partners = generateSequence(PageResponse()) { (page, _) ->
  PageResponse(page + 1, getChunk(page))
}
.takeWhile(PageResponse::isNotLastPage)
.flatMap(PageResponse::getPayload)
.toList()

在这种情况下,我可能只是从 do-while 循环中创建一个序列。对于需要阅读实现的人来说非常可读,对于只关心序列的人非常有用:

fun Server.requestInPages(request: Request, startPage: Int = 0): Sequence<Response> = sequence {
    var page = startPage
    do {
        val response = getChunk(request, page++)
        yield(response)
    } while (response.pageInfo.pageNumber < response.pageInfo.maxPages)
}

然后像这样使用它:

val partners = server.requestInPages(request)
    .flatMap { it.elements }
    .toList()

如果响应完全相同,您甚至可以在 requestInPages 之上添加另一个包含 flatMap 的助手。如果根本没有人关心响应结构和 PageInfo,您甚至可以将 flatMap 设为 requestInPages.

的一部分

奖励积分:如果您的 API 调用在某个时候变成 suspend 函数(例如,如果您切换到协程),这甚至可以变成一个流程。