并行后 akka 流停止
akka streams stops after parallelism
我试图用 akka-streams(以及对它的有限理解)和 Apache 的 pdfbox 构建一个小型 PDF 解析器。
我没有真正理解的一件事是,在 mapAsync 中给出的给定数量的 parallelism 之后,流就停止了。
所以如果一个 PDF-doc 有 20 页并且并行度设置为 5,前 5 页被处理,其余的被忽略,如果设置为 20,一切都很好。有人知道我做错了什么吗?
class PdfParser(ws: WSClient, conf: Configuration, parallelism: Int) {
implicit val system = ActorSystem("image-parser")
implicit val materializer = ActorMaterializer()
def documentPages(doc: PDDocument, key: String): Iterator[Page] = {
val pages: util.List[_] = doc.getDocumentCatalog.getAllPages
val pageList = (for {
i ← 0 until pages.size()
page = pages.get(i)
} yield Page(page, s"$key-$i.jpg")).toIterator
pageList
}
val pageToImage: Flow[Page, Image, NotUsed] = Flow[Page].map { p ⇒
val img = p.content.asInstanceOf[PDPage].convertToImage()
Image(img, p.name)
}
val imageToS3: Flow[Image, String, NotUsed] = Flow[Image].mapAsync(parallelism) { i ⇒
val s3 = S3.fromConfiguration(ws, conf)
val bucket = s3.getBucket("elsa-essays")
val baos = new ByteArrayOutputStream()
ImageIO.write(i.content, "jpg", baos)
val res = bucket add BucketFile(i.name, "image/jpeg", baos.toByteArray)
res.map { _ ⇒
"uploaded"
}.recover {
case e: S3Exception ⇒ e.message
}
}
val sink: Sink[String, Future[String]] = Sink.head[String]
def parse(path: Path, key: String): Future[String] = {
val stream: InputStream = new FileInputStream(path.toString)
val doc = PDDocument.load(stream)
val source = Source.fromIterator(() ⇒ documentPages(doc, key))
val runnable: RunnableGraph[Future[String]] = source.via(pageToImage).via(imageToS3).toMat(sink)(Keep.right)
val res = runnable.run()
res.map { s ⇒
doc.close()
stream.close()
s
}
}
}
问题出在您的接收器上。 Sink.head 将 return 来自您的物化流的一个元素。所以问题是,为什么在流实现中使用 mapAsync(>1) 时它会收到多个值?可能是因为它使用了不止一个演员来向下游推动价值。
无论如何,请将水槽更改为:
val sink: Sink[String, Future[String]] = Sink.fold("")((a, b) => b ++ a)
它会起作用。
我试图用 akka-streams(以及对它的有限理解)和 Apache 的 pdfbox 构建一个小型 PDF 解析器。 我没有真正理解的一件事是,在 mapAsync 中给出的给定数量的 parallelism 之后,流就停止了。 所以如果一个 PDF-doc 有 20 页并且并行度设置为 5,前 5 页被处理,其余的被忽略,如果设置为 20,一切都很好。有人知道我做错了什么吗?
class PdfParser(ws: WSClient, conf: Configuration, parallelism: Int) {
implicit val system = ActorSystem("image-parser")
implicit val materializer = ActorMaterializer()
def documentPages(doc: PDDocument, key: String): Iterator[Page] = {
val pages: util.List[_] = doc.getDocumentCatalog.getAllPages
val pageList = (for {
i ← 0 until pages.size()
page = pages.get(i)
} yield Page(page, s"$key-$i.jpg")).toIterator
pageList
}
val pageToImage: Flow[Page, Image, NotUsed] = Flow[Page].map { p ⇒
val img = p.content.asInstanceOf[PDPage].convertToImage()
Image(img, p.name)
}
val imageToS3: Flow[Image, String, NotUsed] = Flow[Image].mapAsync(parallelism) { i ⇒
val s3 = S3.fromConfiguration(ws, conf)
val bucket = s3.getBucket("elsa-essays")
val baos = new ByteArrayOutputStream()
ImageIO.write(i.content, "jpg", baos)
val res = bucket add BucketFile(i.name, "image/jpeg", baos.toByteArray)
res.map { _ ⇒
"uploaded"
}.recover {
case e: S3Exception ⇒ e.message
}
}
val sink: Sink[String, Future[String]] = Sink.head[String]
def parse(path: Path, key: String): Future[String] = {
val stream: InputStream = new FileInputStream(path.toString)
val doc = PDDocument.load(stream)
val source = Source.fromIterator(() ⇒ documentPages(doc, key))
val runnable: RunnableGraph[Future[String]] = source.via(pageToImage).via(imageToS3).toMat(sink)(Keep.right)
val res = runnable.run()
res.map { s ⇒
doc.close()
stream.close()
s
}
}
}
问题出在您的接收器上。 Sink.head 将 return 来自您的物化流的一个元素。所以问题是,为什么在流实现中使用 mapAsync(>1) 时它会收到多个值?可能是因为它使用了不止一个演员来向下游推动价值。
无论如何,请将水槽更改为:
val sink: Sink[String, Future[String]] = Sink.fold("")((a, b) => b ++ a)
它会起作用。