并行后 akka 流停止

akka streams stops after parallelism

我试图用 akka-streams(以及对它的有限理解)和 Apache 的 pdfbox 构建一个小型 PDF 解析器。 我没有真正理解的一件事是,在 mapAsync 中给出的给定数量的 parallelism 之后,流就停止了。 所以如果一个 PDF-doc 有 20 页并且并行度设置为 5,前 5 页被处理,其余的被忽略,如果设置为 20,一切都很好。有人知道我做错了什么吗?

  class PdfParser(ws: WSClient, conf: Configuration, parallelism: Int) {

  implicit val system = ActorSystem("image-parser")

  implicit val materializer = ActorMaterializer()

  def documentPages(doc: PDDocument, key: String): Iterator[Page] = {

    val pages: util.List[_] = doc.getDocumentCatalog.getAllPages
    val pageList = (for {
      i ← 0 until pages.size()
      page = pages.get(i)
    } yield Page(page, s"$key-$i.jpg")).toIterator
    pageList
  }

  val pageToImage: Flow[Page, Image, NotUsed] = Flow[Page].map { p ⇒
    val img = p.content.asInstanceOf[PDPage].convertToImage()
    Image(img, p.name)
  }

  val imageToS3: Flow[Image, String, NotUsed] = Flow[Image].mapAsync(parallelism) { i ⇒

    val s3 = S3.fromConfiguration(ws, conf)
    val bucket = s3.getBucket("elsa-essays")
    val baos = new ByteArrayOutputStream()
    ImageIO.write(i.content, "jpg", baos)
    val res = bucket add BucketFile(i.name, "image/jpeg", baos.toByteArray)
    res.map { _ ⇒
      "uploaded"
    }.recover {
      case e: S3Exception ⇒ e.message
    }
  }

  val sink: Sink[String, Future[String]] = Sink.head[String]

  def parse(path: Path, key: String): Future[String] = {

    val stream: InputStream = new FileInputStream(path.toString)
    val doc = PDDocument.load(stream)
    val source = Source.fromIterator(() ⇒ documentPages(doc, key))

    val runnable: RunnableGraph[Future[String]] = source.via(pageToImage).via(imageToS3).toMat(sink)(Keep.right)
    val res = runnable.run()

    res.map { s ⇒
      doc.close()
      stream.close()
      s
    }

  }

}

问题出在您的接收器上。 Sink.head 将 return 来自您的物化流的一个元素。所以问题是,为什么在流实现中使用 mapAsync(>1) 时它会收到多个值?可能是因为它使用了不止一个演员来向下游推动价值。

无论如何,请将水槽更改为:

val sink: Sink[String, Future[String]] = Sink.fold("")((a, b) => b ++ a)

它会起作用。