用于组合来自不同来源的数据的 FP 模式(最好是 Kotlin 和 Arrow)

FP patterns for combining data from different sources (preferrably in Kotlin and Arrow)

预先免责声明:最近,我对函数式编程的兴趣增加了,我已经能够应用最基本的方法(尽可能多地使用我的知识和工作的纯函数环境允许)在我的工作中。然而,当涉及到更高级的技术时,我仍然非常缺乏经验,我认为通过在此站点上提问来尝试学习一些可能是正确的主意。我以前也遇到过类似的问题,所以我觉得 FP 应该有一些模式来处理这类问题。

问题描述

归结为以下几点。假设某处有一个 API 提供所有可能宠物的列表。

data class Pet(val name: String, val isFavorite: Boolean = false)

fun fetchAllPetsFromApi(): List<Pet> {
    // this would call a real API irl
    return listOf(Pet("Dog"), Pet("Cat"), Pet("Parrot"))
}

这个 API 对“收藏夹”字段一无所知,也不应该。这不在我的控制之下。它基本上只是返回一个宠物列表。现在我想允许用户将宠物标记为他们最喜欢的。我将这个标志存储在本地数据库中。

所以从 api 中获取所有宠物后,我必须根据持久化数据设置收藏夹标志。

class FavoriteRepository {
    fun petsWithUserFavoriteFlag(allPets: List<Pet>) {
        return allPets.map { it.copy(isFavorite = getFavoriteFlagFromDbFor(it) }
    }

    fun markPetAsFavorite(pet: Pet) {
        // persist to db ...
    }

    fun getFavoriteFlagFromDbFor(pet: Pet): Boolean {...}
}

出于某种原因,我认为这段代码处理的问题是“从一个数据源获取一部分信息,然后将其与另一个数据源的一些信息合并" 可能会受益于 FP 模式的应用,但我不太确定该往哪个方向看。

我已经阅读了 Arrow 的一些文档(伟大的项目 btw :))并且我是 Kotlin 的狂热爱好者,因此非常感谢使用这个库的答案。

这是我可能会做的事情。你的代码有几个重要的缺陷,从函数式编程的角度来看是不安全的:

  • 它不标记副作用,因此编译器不知道这些副作用,也无法跟踪它们的使用方式。这意味着我们可以从任何地方调用这些效果而无需任何控制。影响的例子是网络查询或使用数据库的所有操作。
  • 您的操作没有明确说明它们可能成功或失败的事实,因此调用者只能尝试/捕获异常,否则程序将崩溃。因此,处理这两种情况的要求并不高,这可能会导致遗漏一些异常,从而导致 运行 时间错误。

让我们尝试修复它。让我们从对我们的域错误建模开始,这样我们就有了一组我们的域可以理解的预期错误。让我们也创建一个映射器,以便我们将抛出的所有潜在异常映射到那些预期的域错误之一,以便我们的业务逻辑可以相应地做出反应。

sealed class Error {
    object Error1 : Error()
    object Error2 : Error()
    object Error3 : Error()
}

// Stubbed
fun Throwable.toDomainError() = Error.Error1

如您所见,我们正在对错误和映射器进行存根。您可以花时间在体系结构级别上设计您的域需要的错误,并为这些错误编写适当的纯映射器。让我们继续前进。

是时候标记我们的效果以使编译器知道这些了。为此,我们在 Kotlin 中使用 suspendsuspend 在编译时强制执行调用上下文,因此您永远无法调用效果,除非您处于暂停环境或集成点(协程)中。我们将在此处标记为暂停所有可能产生副作用的操作:网络请求和所有数据库操作。

我还可以自由地将所有数据库操作拉出给 Database 协作者,只是为了便于阅读。

suspend fun fetchAllPetsFromApi(): List<Pet> = ...

class FavoriteRepository(private val db: Database = Database()) {
    suspend fun petsWithUserFavoriteFlag(allPets: List<Pet>) {
        ... will delegate in the Database ops
    }
}

class Database {
    // This would flag it as fav on the corresponding table
    suspend fun markPetAsFavorite(pet: Pet): Pet = ...

    // This would get the flag from the corresponding table
    suspend fun getFavoriteFlagFromDbFor(pet: Pet) = ...
}

我们的副作用现在是安全的。相反,它们变成了对效果的描述,因为如果不提供能够 运行 暂停效果(协程或其他暂停功能)的环境,我们永远无法 运行 它们。在功能术语中,我们会说我们的效果现在 纯粹

现在,让我们开始第二期。

我们还说过,我们没有明确说明每个效果可能成功或失败的事实,因此调用者可能会错过潜在的抛出异常并导致程序崩溃。我们可以通过使用函数 Either<A, B> 数据类型包装数据来引起对数据的关注。让我们将这两种想法结合起来:

suspend fun fetchAllPetsFromApi(): Either<Error, List<Pet>> = ...

class FavoriteRepository(private val db: Database = Database()) {
    suspend fun petsWithUserFavoriteFlag(allPets: List<Pet>): Either<Error, List<Pet>> {
        ... will delegate in the Database ops
    }
}

class Database {
    // This would flag it as fav on the corresponding table
    suspend fun markPetAsFavorite(pet: Pet): Either<Error, Pet> = ...

    // This would get the flag from the corresponding table
    suspend fun getFavoriteFlagFromDbFor(pet: Pet): Either<Error, Boolean> = ...
}

现在这明确了一个事实,即这些计算中的每一个都可能成功或失败,因此调用者将被迫处理双方并且不会忘记处理潜在的错误。我们在这里使用我们的好处的类型。

现在让我们为效果添加逻辑:

// Stubbing a list of pets but you'd have your network request within the catch block
suspend fun fetchAllPetsFromApi(): Either<Error, List<Pet>> =
    Either.catch { listOf(Pet("Dog"), Pet("Cat")) }.mapLeft { it.toDomainError() }

我们可以使用 Either#catch 来包装任何可能抛出的暂停效果。这会自动将结果包装到 Either 中,因此我们可以继续对其进行计算。

更具体地说,它将块的结果包装在 Either.Right 中以防它成功 ,或者将异常包装到 Either.Left 万一它抛出。我们还有 mapLeft 将抛出的潜在异常(Left 端)映射到我们的强类型域错误之一。这就是为什么它 returns Either<Error, List<Pet>> 而不是 Either<Throwable, List<Pet>>.

请注意,对于 Either,我们总是在左侧对错误建模。这是约定俗成的,因为 Right 代表快乐的路径,我们希望我们的成功数据在那里,所以我们可以用 mapflatMap 或其他任何东西继续计算它。

我们现在可以将相同的想法应用于我们的数据库方法:

class Database {
    // This would flag it as fav on the corresponding table, I'm stubbing it here for the example.
    suspend fun markPetAsFavorite(pet: Pet): Either<Error, Pet> =
        Either.catch { pet }.mapLeft { it.toDomainError() }

    // This would get the flag from the corresponding table, I'm stubbing it here for the example.
    suspend fun getFavoriteFlagFromDbFor(pet: Pet): Either<Error, Boolean> =
        Either.catch { true }.mapLeft { it.toDomainError() }
}

我们再次对结果进行存根,但您可以想象我们会在上面的每个 Either.catch {} 块中加载或更新数据库表中的实际暂停效果。

最后,我们可以向 repo 添加一些逻辑:

class FavoriteRepository(private val db: Database = Database()) {

    suspend fun petsWithUserFavoriteFlag(allPets: List<Pet>): Either<Error, List<Pet>> =
        allPets.map { pet ->
            db.getFavoriteFlagFromDbFor(pet).map { isFavInDb ->
                pet.copy(isFavorite = isFavInDb)
            }
        }.sequence(Either.applicative()).fix().map { it.toList() }
}

好的,由于我们的效果是如何编写的,这个可能有点复杂,但我会尽量说清楚。

我们需要映射列表,以便对于从网络加载的每个宠物,我们可以从 Database 加载其收藏状态。然后我们像你一样复制它。但是给定 getFavoriteFlagFromDbFor(pet) returns Either<Error, Booelan> 现在我们的结果是 List<Either<Error, Pet>> 这可能会使处理完整的宠物列表变得困难,因为我们需要迭代,对于每一个,我们首先需要检查它是 Left 还是 Right.

为了更轻松地整体使用 List<Pet>,我们可能希望在此处交换类型,因此我们将使用 Either<Error, List<Pet>>

对于这个魔法,一个选项是 sequencesequence 在这种情况下需要 Either 应用程序,因为它将用于将中间结果和最终列表提升到 Either.

我们也利用机会将 ListK 映射到标准库 List,因为 ListKsequence 内部使用的,但我们可以理解概括地说,它是 List 上的一个函数,所以你有一个想法。因为在这里我们只对实际列表感兴趣以匹配我们的类型,我们可以将 Right<ListK<Pet>> 映射到 Right<List<Pet>>.

最后,我们可以继续使用这个挂起的程序了:

suspend fun main() {
    val repo = FavoriteRepository()
    val hydratedPets = fetchAllPetsFromApi().flatMap { pets -> repo.petsWithUserFavoriteFlag(pets) }
    hydratedPets.fold(
        ifLeft = { error -> println(error) },
        ifRight = { pets -> println(pets) }
    )
}

我们打算 flatMap 因为我们这里有顺序操作。

我们可以做一些潜在的优化,比如使用 parTraverse 从数据库中并行加载宠物列表的所有收藏状态并最终收集结果,但我没有使用它,因为我我不确定您的数据库是否已准备好进行并发访问。

您可以这样做:

suspend fun petsWithUserFavoriteFlag(allPets: List<Pet>): Either<Error, List<Pet>> =
        allPets.parTraverse { pet -> 
            db.getFavoriteFlagFromDbFor(pet).map { isFavInDb ->
                pet.copy(isFavorite = isFavInDb)
            }
        }.sequence(Either.applicative()).fix().map { it.toList() }

我认为我们还可以通过更改某些类型和操作的结构来进一步简化整个事情,但不确定是否要从您的代码库中重构太多,因为我不知道您当前的情况团队限制。

这是完整的代码库:

import arrow.core.Either
import arrow.core.extensions.either.applicative.applicative
import arrow.core.extensions.list.traverse.sequence
import arrow.core.extensions.listk.foldable.toList
import arrow.core.fix
import arrow.core.flatMap

data class Pet(val name: String, val isFavorite: Boolean = false)

// Our sealed hierarchy of potential errors our domain understands
sealed class Error {
    object Error1 : Error()
    object Error2 : Error()
    object Error3 : Error()
}

// Stubbed, would be a mapper from throwable to any of the expected domain errors used via mapLeft.
fun Throwable.toDomainError() = Error.Error1

// This would call a real API irl, stubbed here for the example.
suspend fun fetchAllPetsFromApi(): Either<Error, List<Pet>> =
    Either.catch { listOf(Pet("Dog"), Pet("Cat")) }.mapLeft { it.toDomainError() }

class FavoriteRepository(private val db: Database = Database()) {

    suspend fun petsWithUserFavoriteFlag(allPets: List<Pet>): Either<Error, List<Pet>> =
        allPets.map { pet ->
            db.getFavoriteFlagFromDbFor(pet).map { isFavInDb ->
                pet.copy(isFavorite = isFavInDb)
            }
        }.sequence(Either.applicative()).fix().map { it.toList() }
}

class Database {
    // This would flag it as fav on the corresponding table, I'm stubbing it here for the example.
    suspend fun markPetAsFavorite(pet: Pet): Either<Error, Pet> =
        Either.catch { pet }.mapLeft { it.toDomainError() }

    // This would get the flag from the corresponding table, I'm stubbing it here for the example.
    suspend fun getFavoriteFlagFromDbFor(pet: Pet): Either<Error, Boolean> =
        Either.catch { true }.mapLeft { it.toDomainError() }
}

suspend fun main() {
    val repo = FavoriteRepository()
    val hydratedPets = fetchAllPetsFromApi().flatMap { pets -> repo.petsWithUserFavoriteFlag(pets) }
    hydratedPets.fold(
        ifLeft = { error -> println(error) },
        ifRight = { pets -> println(pets) }
    )
}