缩放 ZIO foreachPar

scala ZIO foreachPar

我是并行编程和 ZIO 的新手,我正在尝试通过并行请求从 API 获取数据。

import sttp.client._
import zio.{Task, ZIO}


ZIO.foreach(files) { file =>
    getData(file)
    Task(file.getName)
  }


def getData(file: File) = {

  val data: String = readData(file)
  val request = basicRequest.body(data).post(uri"$url")
      .headers(content -> "text", char -> "utf-8")
      .response(asString)

  implicit val backend: SttpBackend[Identity, Nothing, NothingT] = HttpURLConnectionBackend()
  request.send().body

  resquest.Response match {
    case Success(value) => {
        val src = new PrintWriter(new File(filename))
        src.write(value.toString)
        src.close()
      }
    case Failure(exception) => log error
  }

当我按顺序执行程序时,它按预期工作, 如果我尝试 运行 并行,将 ZIO.foreach 更改为 ZIO.foreachPar。 该程序提前终止,我明白了,我在这里缺少一些基本的东西, 感谢任何帮助,以帮助我解决问题。

当你使用像 ZIO 这样的纯函数式 IO 库时,你不能调用任何有副作用的函数(如 getData),除非调用工厂方法如 Task.effectTask.apply .

ZIO.foreach(files) { file =>
  Task {
    getData(file)
    file.getName
  }
}

一般来说,我不建议将同步阻塞代码与异步非阻塞代码混合使用,而异步非阻塞代码是 ZIO 的主要作用。可以说,关于如何有效地将 ZIO 用于“世界”,有一些精彩的演讲。

我要提出两个关键点,一个是 ZIO 允许您通过附加分配和最终确定步骤有效地管理资源,另一个是我们可以说的“效果”应该包含在“实际与世界互动的事物”中尽可能小的范围*。

所以让我们稍微看一下这个例子,首先,我不建议使用默认的 Identity 支持后端和 ZIO,我建议使用 AsyncHttpClientZioBackend .

import sttp.client._
import zio.{Task, ZIO}
import zio.blocking.effectBlocking
import sttp.client.asynchttpclient.zio.AsyncHttpClientZioBackend

// Extract the common elements of the request
val baseRequest = basicRequest.post(uri"$url")
      .headers(content -> "text", char -> "utf-8")
      .response(asString)

// Produces a writer which is wrapped in a `Managed` allowing it to be properly
// closed after being used
def managedWriter(filename: String): Managed[IOException, PrintWriter] =
  ZManaged.fromAutoCloseable(UIO(new PrintWriter(new File(filename))))


// This returns an effect which produces an `SttpBackend`, thus we flatMap over it
// to extract the backend.
val program = AsyncHttpClientZioBackend().flatMap { implicit backend => 
  ZIO.foreachPar(files) { file =>
    for {
      // Wrap the synchronous reading of data in a `Task`, but which allows runs this effect on a "blocking" threadpool instead of blocking the main one.
      data <- effectBlocking(readData(file))
      // `send` will return a `Task` because it is using the implicit backend in scope
      resp <- baseRequest.body(data).send()
      // Build the managed writer, then "use" it to produce an effect, at the end of `use` it will automatically close the writer.
      _    <- managedWriter("").use(w => Task(w.write(resp.body.toString))) 
    } yield ()
  }
}

此时您将只有 program,您需要 运行 使用 unsafe 方法之一,或者如果您使用 zio.App 通过main 方法。

* 并不总是可行或方便,但它很有用,因为它通过将任务返回到 运行 时间进行调度来防止资源占用。