如何使用 Akka Streams 和 HTTP 将 HTTP 资源下载到文件?
How to download a HTTP resource to a file with Akka Streams and HTTP?
在过去的几天里,我一直在尝试找出使用 Akka Streams 和 HTTP 将 HTTP 资源下载到文件的最佳方法。
最初我从 Future-Based Variant 开始,看起来像这样:
def downloadViaFutures(uri: Uri, file: File): Future[Long] = {
val request = Get(uri)
val responseFuture = Http().singleRequest(request)
responseFuture.flatMap { response =>
val source = response.entity.dataBytes
source.runWith(FileIO.toFile(file))
}
}
这还不错,但是一旦我了解了更多关于纯 Akka 流的知识,我就想尝试使用 Flow-Based Variant 创建一个从 Source[HttpRequest]
开始的流。起初这完全难住了我,直到我偶然发现 flatMapConcat
流转换。这最终有点冗长:
def responseOrFail[T](in: (Try[HttpResponse], T)): (HttpResponse, T) = in match {
case (responseTry, context) => (responseTry.get, context)
}
def responseToByteSource[T](in: (HttpResponse, T)): Source[ByteString, Any] = in match {
case (response, _) => response.entity.dataBytes
}
def downloadViaFlow(uri: Uri, file: File): Future[Long] = {
val request = Get(uri)
val source = Source.single((request, ()))
val requestResponseFlow = Http().superPool[Unit]()
source.
via(requestResponseFlow).
map(responseOrFail).
flatMapConcat(responseToByteSource).
runWith(FileIO.toFile(file))
}
然后我想变得有点棘手并使用 Content-Disposition
header.
回到 Future-Based 变体:
def destinationFile(downloadDir: File, response: HttpResponse): File = {
val fileName = response.header[ContentDisposition].get.value
val file = new File(downloadDir, fileName)
file.createNewFile()
file
}
def downloadViaFutures2(uri: Uri, downloadDir: File): Future[Long] = {
val request = Get(uri)
val responseFuture = Http().singleRequest(request)
responseFuture.flatMap { response =>
val file = destinationFile(downloadDir, response)
val source = response.entity.dataBytes
source.runWith(FileIO.toFile(file))
}
}
但现在我不知道如何使用 Future-Based 变体来做到这一点。据我所知:
def responseToByteSourceWithDest[T](in: (HttpResponse, T), downloadDir: File): Source[(ByteString, File), Any] = in match {
case (response, _) =>
val source = responseToByteSource(in)
val file = destinationFile(downloadDir, response)
source.map((_, file))
}
def downloadViaFlow2(uri: Uri, downloadDir: File): Future[Long] = {
val request = Get(uri)
val source = Source.single((request, ()))
val requestResponseFlow = Http().superPool[Unit]()
val sourceWithDest: Source[(ByteString, File), Unit] = source.
via(requestResponseFlow).
map(responseOrFail).
flatMapConcat(responseToByteSourceWithDest(_, downloadDir))
sourceWithDest.runWith(???)
}
所以现在我有一个 Source
会为每个 File
发出一个或多个 (ByteString, File)
元素(我说每个 File
因为没有理由原来Source
必须是单个 HttpRequest
).
有没有办法将它们传送到动态 Sink
?
我在想flatMapConcat
,例如:
def runWithMap[T, Mat2](f: T => Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): Mat2 = ???
这样我就可以完成 downloadViaFlow2
:
def destToSink(destination: File): Sink[(ByteString, File), Future[Long]] = {
val sink = FileIO.toFile(destination, true)
Flow[(ByteString, File)].map(_._1).toMat(sink)(Keep.right)
}
sourceWithDest.runWithMap {
case (_, file) => destToSink(file)
}
该解决方案不需要 flatMapConcat。如果您不需要文件写入中的任何 return 值,那么您可以使用 Sink.foreach
:
def writeFile(downloadDir : File)(httpResponse : HttpResponse) : Future[Long] = {
val file = destinationFile(downloadDir, httpResponse)
httpResponse.entity.dataBytes.runWith(FileIO.toFile(file))
}
def downloadViaFlow2(uri: Uri, downloadDir: File) : Future[Unit] = {
val request = HttpRequest(uri=uri)
val source = Source.single((request, ()))
val requestResponseFlow = Http().superPool[Unit]()
source.via(requestResponseFlow)
.map(responseOrFail)
.map(_._1)
.runWith(Sink.foreach(writeFile(downloadDir)))
}
请注意,Sink.foreach
从 writeFile
函数创建 Futures
。因此,涉及的内容并不多 back-pressure。硬盘驱动器可能会减慢 writeFile 的速度,但流会继续生成 Futures。要控制它,您可以使用 Flow.mapAsyncUnordered
(或 Flow.mapAsync
):
val parallelism = 10
source.via(requestResponseFlow)
.map(responseOrFail)
.map(_._1)
.mapAsyncUnordered(parallelism)(writeFile(downloadDir))
.runWith(Sink.ignore)
如果你想累加 Long 值以获得总计数,你需要结合 Sink.fold
:
source.via(requestResponseFlow)
.map(responseOrFail)
.map(_._1)
.mapAsyncUnordered(parallelism)(writeFile(downloadDir))
.runWith(Sink.fold(0L)(_ + _))
折叠将保持 运行 总和,并在请求源枯竭时发出最终值。
使用注入ws的play Web Services客户端,记得导入scala.concurrent.duration._:
def downloadFromUrl(url: String)(ws: WSClient): Future[Try[File]] = {
val file = File.createTempFile("my-prefix", new File("/tmp"))
file.deleteOnExit()
val futureResponse: Future[WSResponse] =
ws.url(url).withMethod("GET").withRequestTimeout(5 minutes).stream()
futureResponse.flatMap { res =>
res.status match {
case 200 =>
val outputStream = java.nio.file.Files.newOutputStream(file.toPath)
val sink = Sink.foreach[ByteString] { bytes => outputStream.write(bytes.toArray) }
res.bodyAsSource.runWith(sink).andThen {
case result =>
outputStream.close()
result.get
} map (_ => Success(file))
case other => Future(Failure[File](new Exception("HTTP Failure, response code " + other + " : " + res.statusText)))
}
}
}
在过去的几天里,我一直在尝试找出使用 Akka Streams 和 HTTP 将 HTTP 资源下载到文件的最佳方法。
最初我从 Future-Based Variant 开始,看起来像这样:
def downloadViaFutures(uri: Uri, file: File): Future[Long] = {
val request = Get(uri)
val responseFuture = Http().singleRequest(request)
responseFuture.flatMap { response =>
val source = response.entity.dataBytes
source.runWith(FileIO.toFile(file))
}
}
这还不错,但是一旦我了解了更多关于纯 Akka 流的知识,我就想尝试使用 Flow-Based Variant 创建一个从 Source[HttpRequest]
开始的流。起初这完全难住了我,直到我偶然发现 flatMapConcat
流转换。这最终有点冗长:
def responseOrFail[T](in: (Try[HttpResponse], T)): (HttpResponse, T) = in match {
case (responseTry, context) => (responseTry.get, context)
}
def responseToByteSource[T](in: (HttpResponse, T)): Source[ByteString, Any] = in match {
case (response, _) => response.entity.dataBytes
}
def downloadViaFlow(uri: Uri, file: File): Future[Long] = {
val request = Get(uri)
val source = Source.single((request, ()))
val requestResponseFlow = Http().superPool[Unit]()
source.
via(requestResponseFlow).
map(responseOrFail).
flatMapConcat(responseToByteSource).
runWith(FileIO.toFile(file))
}
然后我想变得有点棘手并使用 Content-Disposition
header.
回到 Future-Based 变体:
def destinationFile(downloadDir: File, response: HttpResponse): File = {
val fileName = response.header[ContentDisposition].get.value
val file = new File(downloadDir, fileName)
file.createNewFile()
file
}
def downloadViaFutures2(uri: Uri, downloadDir: File): Future[Long] = {
val request = Get(uri)
val responseFuture = Http().singleRequest(request)
responseFuture.flatMap { response =>
val file = destinationFile(downloadDir, response)
val source = response.entity.dataBytes
source.runWith(FileIO.toFile(file))
}
}
但现在我不知道如何使用 Future-Based 变体来做到这一点。据我所知:
def responseToByteSourceWithDest[T](in: (HttpResponse, T), downloadDir: File): Source[(ByteString, File), Any] = in match {
case (response, _) =>
val source = responseToByteSource(in)
val file = destinationFile(downloadDir, response)
source.map((_, file))
}
def downloadViaFlow2(uri: Uri, downloadDir: File): Future[Long] = {
val request = Get(uri)
val source = Source.single((request, ()))
val requestResponseFlow = Http().superPool[Unit]()
val sourceWithDest: Source[(ByteString, File), Unit] = source.
via(requestResponseFlow).
map(responseOrFail).
flatMapConcat(responseToByteSourceWithDest(_, downloadDir))
sourceWithDest.runWith(???)
}
所以现在我有一个 Source
会为每个 File
发出一个或多个 (ByteString, File)
元素(我说每个 File
因为没有理由原来Source
必须是单个 HttpRequest
).
有没有办法将它们传送到动态 Sink
?
我在想flatMapConcat
,例如:
def runWithMap[T, Mat2](f: T => Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): Mat2 = ???
这样我就可以完成 downloadViaFlow2
:
def destToSink(destination: File): Sink[(ByteString, File), Future[Long]] = {
val sink = FileIO.toFile(destination, true)
Flow[(ByteString, File)].map(_._1).toMat(sink)(Keep.right)
}
sourceWithDest.runWithMap {
case (_, file) => destToSink(file)
}
该解决方案不需要 flatMapConcat。如果您不需要文件写入中的任何 return 值,那么您可以使用 Sink.foreach
:
def writeFile(downloadDir : File)(httpResponse : HttpResponse) : Future[Long] = {
val file = destinationFile(downloadDir, httpResponse)
httpResponse.entity.dataBytes.runWith(FileIO.toFile(file))
}
def downloadViaFlow2(uri: Uri, downloadDir: File) : Future[Unit] = {
val request = HttpRequest(uri=uri)
val source = Source.single((request, ()))
val requestResponseFlow = Http().superPool[Unit]()
source.via(requestResponseFlow)
.map(responseOrFail)
.map(_._1)
.runWith(Sink.foreach(writeFile(downloadDir)))
}
请注意,Sink.foreach
从 writeFile
函数创建 Futures
。因此,涉及的内容并不多 back-pressure。硬盘驱动器可能会减慢 writeFile 的速度,但流会继续生成 Futures。要控制它,您可以使用 Flow.mapAsyncUnordered
(或 Flow.mapAsync
):
val parallelism = 10
source.via(requestResponseFlow)
.map(responseOrFail)
.map(_._1)
.mapAsyncUnordered(parallelism)(writeFile(downloadDir))
.runWith(Sink.ignore)
如果你想累加 Long 值以获得总计数,你需要结合 Sink.fold
:
source.via(requestResponseFlow)
.map(responseOrFail)
.map(_._1)
.mapAsyncUnordered(parallelism)(writeFile(downloadDir))
.runWith(Sink.fold(0L)(_ + _))
折叠将保持 运行 总和,并在请求源枯竭时发出最终值。
使用注入ws的play Web Services客户端,记得导入scala.concurrent.duration._:
def downloadFromUrl(url: String)(ws: WSClient): Future[Try[File]] = {
val file = File.createTempFile("my-prefix", new File("/tmp"))
file.deleteOnExit()
val futureResponse: Future[WSResponse] =
ws.url(url).withMethod("GET").withRequestTimeout(5 minutes).stream()
futureResponse.flatMap { res =>
res.status match {
case 200 =>
val outputStream = java.nio.file.Files.newOutputStream(file.toPath)
val sink = Sink.foreach[ByteString] { bytes => outputStream.write(bytes.toArray) }
res.bodyAsSource.runWith(sink).andThen {
case result =>
outputStream.close()
result.get
} map (_ => Success(file))
case other => Future(Failure[File](new Exception("HTTP Failure, response code " + other + " : " + res.statusText)))
}
}
}