组合具有相同物化值的多个来源
Combine multiple sources with same Materialized Values
akka 流中的 combine
运算符具有以下签名:
def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(
strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, NotUsed]
我有多个来源,都具有相同的 Mat
。我需要将它们结合起来以保留 Mat
.
因此我需要一个具有以下签名的函数:
def combine[T, U](first: Source[T, Mat], second: Source[T, Mat], rest: Source[T, Mat]*)(
strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, Seq[Mat]]
现有的combineMat
只接受两个输入。我需要无限。
Akka 的 combine 实现是:
def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(
strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, NotUsed] =
Source.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val c = b.add(strategy(rest.size + 2))
first ~> c.in(0)
second ~> c.in(1)
@tailrec def combineRest(idx: Int, i: Iterator[Source[T, _]]): SourceShape[U] =
if (i.hasNext) {
i.next() ~> c.in(idx)
combineRest(idx + 1, i)
} else SourceShape(c.out)
combineRest(2, rest.iterator)
})
它使用不支持 Mat
s 的 SourceShape
,所以我认为在这里不起作用。
同时,combineMat
的实现使用 viaMat
,这不适用于多个流。
这可能吗?
以下作品:
import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl.{GraphDSL, Source}
import akka.stream.{Graph, SourceShape, UniformFanInShape}
import scala.collection.immutable
object Combine {
def combine[T, U, Mat](sources: immutable.Seq[Source[T, Mat]])(strategy: Int => Graph[UniformFanInShape[T, U], Mat]): Source[U, immutable.Seq[Mat]] = {
Source.fromGraph(GraphDSL.create(sources) {
implicit builder => {
sourceShapes => {
val target = builder.add(strategy(sources.size))
for ((source, index) <- sourceShapes.zipWithIndex) {
source ~> target.in(index)
}
SourceShape(target.out)
}
}
})
}
}
akka 流中的 combine
运算符具有以下签名:
def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(
strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, NotUsed]
我有多个来源,都具有相同的 Mat
。我需要将它们结合起来以保留 Mat
.
因此我需要一个具有以下签名的函数:
def combine[T, U](first: Source[T, Mat], second: Source[T, Mat], rest: Source[T, Mat]*)(
strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, Seq[Mat]]
现有的combineMat
只接受两个输入。我需要无限。
Akka 的 combine 实现是:
def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(
strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, NotUsed] =
Source.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val c = b.add(strategy(rest.size + 2))
first ~> c.in(0)
second ~> c.in(1)
@tailrec def combineRest(idx: Int, i: Iterator[Source[T, _]]): SourceShape[U] =
if (i.hasNext) {
i.next() ~> c.in(idx)
combineRest(idx + 1, i)
} else SourceShape(c.out)
combineRest(2, rest.iterator)
})
它使用不支持 Mat
s 的 SourceShape
,所以我认为在这里不起作用。
同时,combineMat
的实现使用 viaMat
,这不适用于多个流。
这可能吗?
以下作品:
import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl.{GraphDSL, Source}
import akka.stream.{Graph, SourceShape, UniformFanInShape}
import scala.collection.immutable
object Combine {
def combine[T, U, Mat](sources: immutable.Seq[Source[T, Mat]])(strategy: Int => Graph[UniformFanInShape[T, U], Mat]): Source[U, immutable.Seq[Mat]] = {
Source.fromGraph(GraphDSL.create(sources) {
implicit builder => {
sourceShapes => {
val target = builder.add(strategy(sources.size))
for ((source, index) <- sourceShapes.zipWithIndex) {
source ~> target.in(index)
}
SourceShape(target.out)
}
}
})
}
}