如何抽象效果并将 ContextShift 与 Scala Cats 一起使用?
How do I abstract over effects and use ContextShift with Scala Cats?
我正在 Scala 和 Cats 中创建一个函数,该函数执行一些 I/O 并将由代码的其他部分调用。我也在学习猫,我希望我的功能:
- 效果通用,使用
F[_]
- 运行 在专用线程池上
- 我想引入异步边界
我假设我的所有函数在 F[_] 中都是通用的,直到 main 方法,因为我试图遵循 these Cat's guidelines
但我很难通过使用 ContextShift
或 ExecutionContext
来使这些约束起作用。我写了一个 full example here,这是示例的摘录:
object ComplexOperation {
// Thread pool for ComplexOperation internal use only
val cs = IO.contextShift(
ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())
)
// Complex operation that takes resources and time
def run[F[_]: Sync](input: String): F[String] =
for {
r1 <- Sync[F].delay(cs.shift) *> op1(input)
r2 <- Sync[F].delay(cs.shift) *> op2(r1)
r3 <- Sync[F].delay(cs.shift) *> op3(r2)
} yield r3
def op1[F[_]: Sync](input: String): F[Int] = Sync[F].delay(input.length)
def op2[F[_]: Sync](input: Int): F[Boolean] = Sync[F].delay(input % 2 == 0)
def op3[F[_]: Sync](input: Boolean): F[String] = Sync[F].delay(s"Complex result: $input")
}
这显然不会抽象过度效果,因为 ComplexOperation.run
需要 ContextShift[IO]
才能引入异步边界。这样做的正确(或最佳)方法是什么?
在 ComplexOperation.run
中创建 ContextShift[IO]
会使函数依赖于 IO
,这是我不想要的。
在调用者上移动 ContextShift[IO]
的创建只会转移问题:调用者在 F[_]
中也是通用的,那么它如何获得 ContextShift[IO]
以传递给 ComplexOperation.run
而无需明确取决于 IO
?
请记住,我不想使用在最顶层定义的全局 ContextShift[IO]
,但我希望每个组件自行决定。
我的 ComplexOperation.run
应该创建 ContextShift[IO]
还是调用者的责任?
我至少做对了吗?还是我违反了标准做法?
所以我冒昧地重写了您的代码,希望对您有所帮助:
import cats.effect._
object Functions {
def sampleFunction[F[_]: Sync : ContextShift](file: String, blocker: Blocker): F[String] = {
val handler: Resource[F, Int] =
Resource.make(
blocker.blockOn(openFile(file))
) { handler =>
blocker.blockOn(closeFile(handler))
}
handler.use(handler => doWork(handler))
}
private def openFile[F[_]: Sync](file: String): F[Int] = Sync[F].delay {
println(s"Opening file $file with handler 2")
2
}
private def closeFile[F[_]: Sync](handler: Int): F[Unit] = Sync[F].delay {
println(s"Closing file handler $handler")
}
private def doWork[F[_]: Sync](handler: Int): F[String] = Sync[F].delay {
println(s"Calculating the value on file handler $handler")
"The final value"
}
}
object Main extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val result = Blocker[IO].use { blocker =>
Functions.sampleFunction[IO](file = "filePath", blocker)
}
for {
data <- result
_ <- IO(println(data))
} yield ExitCode.Success
}
}
可以看到运行宁here.
那么,这段代码的作用是什么。
首先,它为文件创建一个 Resource,因为 close
必须完成,即使在保证或失败时也是如此。
它在阻塞线程 poo 上使用 Blocker 到 运行 open
和 close
操作(这是使用 ContextShift).
最后,在主要方面,它为 **IO* 创建了一个默认的 Blocker,并使用它来调用您的函数;并打印结果。
欢迎提问。
我正在 Scala 和 Cats 中创建一个函数,该函数执行一些 I/O 并将由代码的其他部分调用。我也在学习猫,我希望我的功能:
- 效果通用,使用
F[_]
- 运行 在专用线程池上
- 我想引入异步边界
我假设我的所有函数在 F[_] 中都是通用的,直到 main 方法,因为我试图遵循 these Cat's guidelines
但我很难通过使用 ContextShift
或 ExecutionContext
来使这些约束起作用。我写了一个 full example here,这是示例的摘录:
object ComplexOperation {
// Thread pool for ComplexOperation internal use only
val cs = IO.contextShift(
ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())
)
// Complex operation that takes resources and time
def run[F[_]: Sync](input: String): F[String] =
for {
r1 <- Sync[F].delay(cs.shift) *> op1(input)
r2 <- Sync[F].delay(cs.shift) *> op2(r1)
r3 <- Sync[F].delay(cs.shift) *> op3(r2)
} yield r3
def op1[F[_]: Sync](input: String): F[Int] = Sync[F].delay(input.length)
def op2[F[_]: Sync](input: Int): F[Boolean] = Sync[F].delay(input % 2 == 0)
def op3[F[_]: Sync](input: Boolean): F[String] = Sync[F].delay(s"Complex result: $input")
}
这显然不会抽象过度效果,因为 ComplexOperation.run
需要 ContextShift[IO]
才能引入异步边界。这样做的正确(或最佳)方法是什么?
在 ComplexOperation.run
中创建 ContextShift[IO]
会使函数依赖于 IO
,这是我不想要的。
在调用者上移动 ContextShift[IO]
的创建只会转移问题:调用者在 F[_]
中也是通用的,那么它如何获得 ContextShift[IO]
以传递给 ComplexOperation.run
而无需明确取决于 IO
?
请记住,我不想使用在最顶层定义的全局 ContextShift[IO]
,但我希望每个组件自行决定。
我的 ComplexOperation.run
应该创建 ContextShift[IO]
还是调用者的责任?
我至少做对了吗?还是我违反了标准做法?
所以我冒昧地重写了您的代码,希望对您有所帮助:
import cats.effect._
object Functions {
def sampleFunction[F[_]: Sync : ContextShift](file: String, blocker: Blocker): F[String] = {
val handler: Resource[F, Int] =
Resource.make(
blocker.blockOn(openFile(file))
) { handler =>
blocker.blockOn(closeFile(handler))
}
handler.use(handler => doWork(handler))
}
private def openFile[F[_]: Sync](file: String): F[Int] = Sync[F].delay {
println(s"Opening file $file with handler 2")
2
}
private def closeFile[F[_]: Sync](handler: Int): F[Unit] = Sync[F].delay {
println(s"Closing file handler $handler")
}
private def doWork[F[_]: Sync](handler: Int): F[String] = Sync[F].delay {
println(s"Calculating the value on file handler $handler")
"The final value"
}
}
object Main extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val result = Blocker[IO].use { blocker =>
Functions.sampleFunction[IO](file = "filePath", blocker)
}
for {
data <- result
_ <- IO(println(data))
} yield ExitCode.Success
}
}
可以看到运行宁here.
那么,这段代码的作用是什么。
首先,它为文件创建一个 Resource,因为 close
必须完成,即使在保证或失败时也是如此。
它在阻塞线程 poo 上使用 Blocker 到 运行 open
和 close
操作(这是使用 ContextShift).
最后,在主要方面,它为 **IO* 创建了一个默认的 Blocker,并使用它来调用您的函数;并打印结果。
欢迎提问。