如何组合 2 个不同类型的水槽?
How to combine 2 Sinks of a different type?
我想使用 Alpakka 上传文件到 S3,同时使用 Tika 解析它以获得它的 MimeType。
我现在有 3 个部分的图表:
val fileSource: Source[ByteString, Any] // comes from Akka-HTTP
val fileUpload: Sink[ByteString, Future[MultipartUploadResult]] // created by S3Client from Alpakka
val mimeTypeDetection: Sink[ByteString, Future[MediaType.Binary]] // my implementation using Apache Tika
我想在一个地方获得两个结果,例如:
Future[(MultipartUploadResult, MediaType.Binary)]
广播部分没有问题:
val broadcast = builder.add(Broadcast[ByteString](2))
source ~> broadcast ~> fileUpload
broadcast ~> mimeTypeDetection
但是我在编写 Sinks 时遇到了麻烦。我在 API 和文档中找到的方法假定组合接收器属于同一类型,或者我是 Zipping Flows,而不是 Sinks。
在这种情况下建议采用什么方法?
两种方式:
1) 使用 alsoToMat
(更简单,无需 GraphDSL,足以满足您的示例)
val mat1: (Future[MultipartUploadResult], Future[Binary]) =
fileSource
.alsoToMat(fileUpload)(Keep.right)
.toMat(mimeTypeDetection)(Keep.both)
.run()
2) 使用具有自定义物化值的 GraphDSL(更冗长,更灵活)。 docs)
中的更多信息
val mat2: (Future[MultipartUploadResult], Future[Binary]) =
RunnableGraph.fromGraph(GraphDSL.create(fileUpload, mimeTypeDetection)((_, _)) { implicit builder =>
(fileUpload, mimeTypeDetection) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[ByteString](2))
fileSource ~> broadcast ~> fileUpload
broadcast ~> mimeTypeDetection
ClosedShape
}).run()
我想使用 Alpakka 上传文件到 S3,同时使用 Tika 解析它以获得它的 MimeType。
我现在有 3 个部分的图表:
val fileSource: Source[ByteString, Any] // comes from Akka-HTTP
val fileUpload: Sink[ByteString, Future[MultipartUploadResult]] // created by S3Client from Alpakka
val mimeTypeDetection: Sink[ByteString, Future[MediaType.Binary]] // my implementation using Apache Tika
我想在一个地方获得两个结果,例如:
Future[(MultipartUploadResult, MediaType.Binary)]
广播部分没有问题:
val broadcast = builder.add(Broadcast[ByteString](2))
source ~> broadcast ~> fileUpload
broadcast ~> mimeTypeDetection
但是我在编写 Sinks 时遇到了麻烦。我在 API 和文档中找到的方法假定组合接收器属于同一类型,或者我是 Zipping Flows,而不是 Sinks。
在这种情况下建议采用什么方法?
两种方式:
1) 使用 alsoToMat
(更简单,无需 GraphDSL,足以满足您的示例)
val mat1: (Future[MultipartUploadResult], Future[Binary]) =
fileSource
.alsoToMat(fileUpload)(Keep.right)
.toMat(mimeTypeDetection)(Keep.both)
.run()
2) 使用具有自定义物化值的 GraphDSL(更冗长,更灵活)。 docs)
中的更多信息 val mat2: (Future[MultipartUploadResult], Future[Binary]) =
RunnableGraph.fromGraph(GraphDSL.create(fileUpload, mimeTypeDetection)((_, _)) { implicit builder =>
(fileUpload, mimeTypeDetection) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[ByteString](2))
fileSource ~> broadcast ~> fileUpload
broadcast ~> mimeTypeDetection
ClosedShape
}).run()