Alpakka S3 从存储桶下载文件,保存到文件,并为流程的下一部分提供文件名

Alpakka S3 download file from bucket, save to file, and have filename available for next part in flow

我正在尝试构建使用 S3 密钥的代码,然后从 S3 下载这些文件,然后将该数据保存到磁盘上的文件中,并使用密钥名(流程中进一步的进程需要)并作为输出 returns key/filename。到目前为止我所拥有的是;

    val x: Sink[String, Future[IOResult]] =
      Flow[String].flatMapConcat(key => S3.download("somebucket", key)).
        withAttributes(S3Attributes.settings(useVersion1Api)).
        collect{ case Some(x) => x._1 }.
        flatMapConcat(identity).toMat(FileIO.toPath(Paths.get("???????")))(Keep.right)

我目前有下载文件,但没有; - 使用键名作为文件名 - returns 文件名(它不应该是一个接收器而是一个流)

如有任何指点,我将不胜感激。我刚开始使用 alpakka 和 akka 流。可能我需要以某种方式在元组中传递密钥,但我似乎无法弄清楚以后如何使用元组的那部分。

使用 cchantep 第一个建议可能会解决;

    val s3FileSaveFlow: Flow[String, (String, ObjectMetadata), NotUsed] =
      Flow[String].flatMapConcat(key => S3.download("somebucket", key) collect{ case Some(src) => key -> src}).
        flatMapConcat{ case (key,(src,meta)) => {
          src.to(FileIO.toPath(Paths.get(key)))
          Source.single((key,meta))
        }}
val x: Sink[String, Future[IOResult]] =
  Flow[String].flatMapConcat(key => 
    S3.download("somebucket", key).collect {
      case Some(src) => key -> SRC
    })

然后你有key和字节src

如果你想看一下Benji S3 DSL(我是其中的贡献者):

import akka.stream.scaladsl.FileIO

Flow[String].flatMapConcat { objKey =>
  s3.bucket("somebucket").obj(objKey).get().
    viaMat(FileIO.toPath(Path.get("/basedir", objKey)))(Keep.right)
}