IO.async Scala 中 cats.effect 的回调问题

IO.async callback problem with cats.effect in Scala

我正在尝试通过 java11 HttpClient 在 Scala

中重写 httpclient

这是我的代码:

import cats.effect._
import java.net.http._
import java.net.http.HttpResponse._
import java.net.http.HttpClient._

trait HttpClients[F[_]] {
  def send(req: HttpRequest)(implicit F: Async[F]): F[HttpResponse[_]]
}

object HttpClients {
  val client: HttpClient = HttpClient.newBuilder().followRedirects(Redirect.ALWAYS).build()
  def newClient[F[_] : Async](): HttpClients[F] = new HttpClients[F] {
    override def send(req: HttpRequest)(implicit F: Async[F]): F[HttpResponse[_]] = F.async { cb =>
      val resp = client.sendAsync(req, BodyHandlers.ofString())
      val s = resp.handle((res: HttpResponse[String], err: Throwable) => {
        if (err == null)
          cb(Right(res))
        else
          cb(Left(err))
      })
      s // TODO ?
      // Type missmatch
      // Required: F[Option[F[Unit]]]
      // Found:    Unit
    }
  }
}

来自 this

的句柄回调

我猜错误来自here,但我不知道接下来怎么写。

然后我做一些改变:

  def newClient[F[_] : Async](): HttpClients[F] = new HttpClients[F] {
    override def send(req: HttpRequest)(implicit F: Async[F]): F[HttpResponse[_]] = F.async[HttpResponse[_]] { cb =>
      val s = Sync[F](F: Async[F]).delay {
        val resp = client.sendAsync(req, BodyHandlers.ofString())
        resp.handle((res: HttpResponse[String], err: Throwable) => {
          if (err == null)
            cb(Right(res))
          else
            cb(Left(err))
        }).join()
      }
      F.delay(s.some)
    }
  }

这次没有报错,但是不知道怎么获取response的body

感谢您的回复!

@OlegPyzhcov 已经在您使用 CE3 的情况下提供了见解,如果您想要的话,这个答案使用的是 CE2

第一个版本的代码是正确的,这里是一个完整的 运行 示例,使用 A​​mmonite 进行了一些样式改进,并确保为每次调用创建一个新客户端和 newClient

的评价
// scala 2.13.5

import $ivy.`org.typelevel::cats-effect:2.5.0`

import cats.effect.{Async, IO}
import cats.syntax.all._
import java.net.URI
import java.net.http.{HttpClient, HttpRequest, HttpResponse}

trait HttpClients[F[_]] {
  def send(req: HttpRequest): F[HttpResponse[String]]
}

object HttpClients {
  def newClient[F[_]](implicit F: Async[F]): F[HttpClients[F]] =
    F.delay {
      HttpClient
        .newBuilder
        .followRedirects(HttpClient.Redirect.ALWAYS)
        .build()
    } map { client =>
      new HttpClients[F] {
        override def send(req: HttpRequest): F[HttpResponse[String]] =
          F.async { cb =>
            client.sendAsync(req, HttpResponse.BodyHandlers.ofString).handle {
              (res: HttpResponse[String], err: Throwable) =>
                if (err == null) cb(Right(res))
                else cb(Left(err))
            }
          }
      }
    }
}

object Main {
  private val request =
    HttpRequest
      .newBuilder
      .GET
      .uri(URI.create("https://whosebug.com/questions/tagged/scala?tab=Newest"))
      .build()

  private val program = for {
    _ <- IO.delay(println("Hello, World!"))
    client <- HttpClients.newClient[IO]
    response <- client.send(request)
    _ <- IO.delay(println(response))
    _ <- IO.delay(println(response.body))
  } yield ()

  def run(): Unit = {
    program.unsafeRunSync()
  }
}

@main
def main(): Unit = {
  Main.run()
}