组合具有相同物化值的多个来源

Combine multiple sources with same Materialized Values

akka 流中的 combine 运算符具有以下签名:

  def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(
      strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, NotUsed]

我有多个来源,都具有相同的 Mat。我需要将它们结合起来以保留 Mat.

因此我需要一个具有以下签名的函数:

  def combine[T, U](first: Source[T, Mat], second: Source[T, Mat], rest: Source[T, Mat]*)(
      strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, Seq[Mat]]

现有的combineMat只接受两个输入。我需要无限。

Akka 的 combine 实现是:

  def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(
      strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, NotUsed] =
    Source.fromGraph(GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._
      val c = b.add(strategy(rest.size + 2))
      first ~> c.in(0)
      second ~> c.in(1)

      @tailrec def combineRest(idx: Int, i: Iterator[Source[T, _]]): SourceShape[U] =
        if (i.hasNext) {
          i.next() ~> c.in(idx)
          combineRest(idx + 1, i)
        } else SourceShape(c.out)

      combineRest(2, rest.iterator)
    })

它使用不支持 Mats 的 SourceShape,所以我认为在这里不起作用。

同时,combineMat 的实现使用 viaMat,这不适用于多个流。

这可能吗?

以下作品:

import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl.{GraphDSL, Source}
import akka.stream.{Graph, SourceShape, UniformFanInShape}

import scala.collection.immutable

object Combine {
  def combine[T, U, Mat](sources: immutable.Seq[Source[T, Mat]])(strategy: Int => Graph[UniformFanInShape[T, U], Mat]): Source[U, immutable.Seq[Mat]] = {
    Source.fromGraph(GraphDSL.create(sources) {
      implicit builder => {
        sourceShapes => {
          val target = builder.add(strategy(sources.size))

          for ((source, index) <- sourceShapes.zipWithIndex) {
            source ~> target.in(index)
          }

          SourceShape(target.out)
        }
      }
    })
  }
}