如何组合 2 个不同类型的水槽?

How to combine 2 Sinks of a different type?

我想使用 Alpakka 上传文件到 S3,同时使用 Tika 解析它以获得它的 MimeType。

我现在有 3 个部分的图表:

val fileSource: Source[ByteString, Any] // comes from Akka-HTTP
val fileUpload: Sink[ByteString, Future[MultipartUploadResult]] // created by S3Client from Alpakka
val mimeTypeDetection: Sink[ByteString, Future[MediaType.Binary]] // my implementation using Apache Tika

我想在一个地方获得两个结果,例如:

Future[(MultipartUploadResult, MediaType.Binary)]

广播部分没有问题:

val broadcast = builder.add(Broadcast[ByteString](2))

source ~> broadcast ~> fileUpload
          broadcast ~> mimeTypeDetection

但是我在编写 Sinks 时遇到了麻烦。我在 API 和文档中找到的方法假定组合接收器属于同一类型,或者我是 Zipping Flows,而不是 Sinks。

在这种情况下建议采用什么方法?

两种方式:

1) 使用 alsoToMat(更简单,无需 GraphDSL,足以满足您的示例)

  val mat1: (Future[MultipartUploadResult], Future[Binary]) =
    fileSource
    .alsoToMat(fileUpload)(Keep.right)
    .toMat(mimeTypeDetection)(Keep.both)
    .run()

2) 使用具有自定义物化值的 GraphDSL(更冗长,更灵活)。 docs)

中的更多信息
  val mat2: (Future[MultipartUploadResult], Future[Binary]) = 
    RunnableGraph.fromGraph(GraphDSL.create(fileUpload, mimeTypeDetection)((_, _)) { implicit builder =>
      (fileUpload, mimeTypeDetection) =>
        import GraphDSL.Implicits._
        val broadcast = builder.add(Broadcast[ByteString](2))

        fileSource ~> broadcast ~> fileUpload
                      broadcast ~> mimeTypeDetection
        ClosedShape
    }).run()