Akka 流 - 将 ByteString 流拆分为多个文件

Akka stream - Splitting a stream of ByteString into multiple files

我正在尝试将传入的 Akka 字节流(来自 http 请求的主体,但也可能来自文件)拆分为多个定义大小的文件。

例如,如果我上传一个 10Gb 的文件,它会创建大约 10 个 1Gb 的文件。这些文件将具有随机生成的名称。我的问题是我真的不知道从哪里开始,因为我读过的所有响应和示例要么将整个块存储到内存中,要么使用基于字符串的定界符。除了我不能真正拥有 1Gb 的 "chunks",然后将它们写入磁盘..

有没有简单的方法来执行那种操作?我唯一的想法是使用类似 http://doc.akka.io/docs/akka/2.4/scala/stream/stream-cookbook.html#Chunking_up_a_stream_of_ByteStrings_into_limited_size_ByteStrings 的东西,但转换为类似 FlowShape[ByteString, File] 的东西,将自己写入一个文件块,直到达到最大文件大小,然后创建一个新文件,等等。 ,并流回创建的文件。没有正确使用 Akka 看起来是个糟糕的想法..

提前致谢

我经常恢复到纯函数式、非 akka 技术来解决诸如此类的问题,然后 "lift" 将这些函数转换为 akka 结构。我的意思是我尝试只使用 scala "stuff" 然后尝试将这些东西包装在 akka 中......

文件创建

FileOutputStream 创作开始 "randomly generated names":

def randomFileNameGenerator : String = ??? //not specified in question

import java.io.FileOutputStream

val randomFileOutGenerator : () => FileOutputStream = 
  () => new FileOutputStream(randomFileNameGenerator)

需要某种方式来存储当前文件的 "state",例如已经写入的字节数:

case class FileState(byteCount : Int = 0, 
                     fileOut : FileOutputStream = randomFileOutGenerator())

文件写入

首先我们确定我们是否会违反给定 ByteString:

的最大文件大小阈值
import akka.util.ByteString

val isEndOfChunk : (FileState, ByteString, Int) => Boolean =
  (state, byteString, maxBytes) =>
    state.byteCount + byteString.length > maxBytes

然后我们必须编写函数来创建一个新的 FileState 如果我们已经用完了当前的容量或者 returns 当前状态如果它仍然低于容量:

val closeFileInState : FileState => Unit = 
  (_ : FileState).fileOut.close()

val getCurrentFileState(FileState, ByteString, Int) => FileState = 
  (state, byteString, maxBytes) =>
    if(isEndOfChunk(maxBytes, state, byteString)) {
      closeFileInState(state)
      FileState()
    }
    else
      state

唯一剩下的就是写信给FileOutputStream:

val writeToFileAndReturn(FileState, ByteString) => FileState = 
  (fileState, byteString) => {
    fileState.fileOut write byteString.toArray
    fileState copy (byteCount = fileState.byteCount + byteString.size)
  }

//the signature ordering will become useful
def writeToChunkedFile(maxBytes : Int)(fileState : FileState, byteString : ByteString) : FileState =    
  writeToFileAndReturn(getCurrentFileState(maxBytes, fileState, byteString), byteString)    

在任何 GenTraversableOnce 上弃牌

在 scala 中,GenTraversableOnce 是任何具有折叠运算符的集合,无论是否平行。这些包括 Iterator、Vector、Array、Seq、scala stream、...最终的 writeToChunkedFile 函数与 GenTraversableOnce#fold:

的签名完美匹配
val anyIterable : Iterable = ???

val finalFileState = anyIterable.fold(FileState())(writetochunkedFile(maxBytes))

最后一个松散的结局;最后一个 FileOutputStream 也需要关闭。由于折叠只会发出最后一个 FileState 我们可以关闭那个:

closeFileInState(finalFileState)

Akka 流

Akka Flow 得到它的 fold from FlowOps#fold 恰好匹配 GenTraversableOnce 签名。因此,我们可以 "lift" 将我们的常规函数​​转换为流值,类似于我们使用 Iterable fold:

的方式
import akka.stream.scaladsl.Flow

def chunkerFlow(maxBytes : Int) : Flow[ByteString, FileState, _] = 
  Flow[ByteString].fold(FileState())(writeToChunkedFile(maxBytes))

使用常规函数处理问题的好处在于它们可以在流之外的其他异步框架中使用,例如期货或演员。您在单元测试中也不需要 akka ActorSystem,只需要常规语言数据结构。

import akka.stream.scaladsl.Sink
import scala.concurrent.Future

def byteStringSink(maxBytes : Int) : Sink[ByteString, _] = 
  chunkerFlow(maxBytes) to (Sink foreach closeFileInState)

然后您可以使用此 Sink 来排出来自 HttpRequestHttpEntity

您可以编写自定义图表阶段。 您的问题类似于上传到亚马逊 S3 期间在 alpakka 中遇到的问题。 (google alpakka s3 连接器..他们不会让我 post 超过 2 个链接)

出于某种原因,s3 连接器 DiskBuffer 然而在发出块以进行进一步的流处理之前将整个传入的字节串源写入文件。.

我们想要的是类似于limit a source of byte strings to specific length的东西。在示例中,他们通过维护内存缓冲区将传入的 Source[ByteString, _] 限制为固定大小的 byteString 源。我采用它来处理文件。 这样做的好处是可以为这个阶段使用专用的线程池来做阻塞IO。对于良好的反应流,您希望在 actor 系统的单独线程池中保持阻塞 IO。 PS:这不会尝试生成精确大小的文件。因此,如果我们在 100MB 的文件中额外读取 2KB 的内容……我们会将这些额外的字节写入当前文件,而不是尝试获得精确的大小。

import java.io.{FileOutputStream, RandomAccessFile}
import java.nio.channels.FileChannel
import java.nio.file.Path

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream._
import akka.util.ByteString

case class MultipartUploadChunk(path: Path, size: Int, partNumber: Int)
//Starts writing the byteStrings received from upstream to a file. Emits a path after writing a partSize number of bytes. Does not attemtp to write exact number of bytes.
class FileChunker(maxSize: Int, tempDir: Path, partSize: Int)
    extends GraphStage[FlowShape[ByteString, MultipartUploadChunk]] {

  assert(maxSize > partSize, "Max size should be larger than part size. ")

  val in: Inlet[ByteString] = Inlet[ByteString]("PartsMaker.in")
  val out: Outlet[MultipartUploadChunk] = Outlet[MultipartUploadChunk]("PartsMaker.out")

  override val shape: FlowShape[ByteString, MultipartUploadChunk] = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with OutHandler with InHandler {

      var partNumber: Int = 0
      var length: Int = 0
      var currentBuffer: Option[PartBuffer] = None

      override def onPull(): Unit =
        if (isClosed(in)) {
          emitPart(currentBuffer, length)
        } else {
          pull(in)
        }

      override def onPush(): Unit = {
        val elem = grab(in)
        length += elem.size
        val currentPart: PartBuffer = currentBuffer match {
          case Some(part) => part
          case None =>
            val newPart = createPart(partNumber)
            currentBuffer = Some(newPart)
            newPart
        }
        currentPart.fileChannel.write(elem.asByteBuffer)
        if (length > partSize) {
          emitPart(currentBuffer, length)
          //3. .increment part number, reset length.
          partNumber += 1
          length = 0
        } else {
          pull(in)
        }
      }

      override def onUpstreamFinish(): Unit =
        if (length > 0) emitPart(currentBuffer, length) // emit part only if something is still left in current buffer.

      private def emitPart(maybePart: Option[PartBuffer], size: Int): Unit = maybePart match {
        case Some(part) =>
          //1. flush the part buffer and truncate the file.
          part.fileChannel.force(false)
          //          not sure why we do this truncate.. but was being done in alpakka. also maybe safe to do.
//                    val ch = new FileOutputStream(part.path.toFile).getChannel
//          try {
//            println(s"truncating to size $size")
//            ch.truncate(size)
//          } finally {
//            ch.close()
//          }
          //2emit the part
          val chunk = MultipartUploadChunk(path = part.path, size = length, partNumber = partNumber)
          push(out, chunk)
          part.fileChannel.close() // TODO: probably close elsewhere.
          currentBuffer = None
          //complete stage if in is closed.
          if (isClosed(in)) completeStage()
        case None => if (isClosed(in)) completeStage()
      }

      private def createPart(partNum: Int): PartBuffer = {
        val path: Path = partFile(partNum)
        //currentPart.deleteOnExit() //TODO: Enable in prod. requests that the file be deleted when VM dies.
        PartBuffer(path, new RandomAccessFile(path.toFile, "rw").getChannel)
      }

      /**
       * Creates a file in the temp directory with name bmcs-buffer-part-$partNumber
       * @param partNumber the part number in multipart upload.
       * @return
       * TODO:add unique id to the file name. for multiple
       */
      private def partFile(partNumber: Int): Path =
        tempDir.resolve(s"bmcs-buffer-part-$partNumber.bin")
      setHandlers(in, out, this)
    }

  case class PartBuffer(path: Path, fileChannel: FileChannel) //TODO:  see if you need mapped byte buffer. might be ok with just output stream / channel.

}

ByteString 流拆分为多个文件的惯用方法是使用 Alpakka 的 LogRotatorSink. From the documentation:

This sink will takes a function as parameter which returns a Bytestring => Option[Path] function. If the generated function returns a path the sink will rotate the file output to this new path and the actual ByteString will be written to this new file too. With this approach the user can define a custom stateful file generation implementation.

以下fileSizeRotationFunction也来自文档:

val fileSizeRotationFunction = () => {
  val max = 10 * 1024 * 1024
  var size: Long = max
  (element: ByteString) =>
    {
      if (size + element.size > max) {
        val path = Files.createTempFile("out-", ".log")
        size = element.size
        Some(path)
      } else {
        size += element.size
        None
      }
    }
}

使用示例:

val source: Source[ByteString, _] = ???
source.runWith(LogRotatorSink(fileSizeRotationFunction))