MergeLatest 的默认值
Default value for MergeLatest
MergeLatest
的官方 documentation 声明:
MergeLatest emits list for each element emitted from some input stream, but only after each input stream emitted at least one element.
我的问题是:这个可以绕过吗?
例如,我们能否提供一个默认值,使其在从任何输入流中至少接收到一个元素后立即开始生成列表?
以下应该是新行为:
(1,0,0)
(2,0,0)
(2,1,0)
(2,1,1)
(2,1,2)
而不是:
(2,1,1)
(2,1,2)
因为我也需要将那些第一个列表推送到输出流
对于每个传入流,您可以使用 Source.single(0).concat
发出一个零,然后发出流的其余部分:
def withInitialValue[A](source: Source[A, NotUsed], a: A): Source[A, NotUsed] =
Source.single(a).concat(source)
很遗憾,mergeLatest
不提供此类选项。而且似乎没有任何 Stream 运算符可以轻松做到这一点。一种方法是根据特定需要重新调整 MergeLatest
的用途。好消息是必要的代码更改相当简单,因为相关代码实现是 UniformFanInShape
的标准 GraphStage
。
import akka.stream.scaladsl._
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.stream.{ Attributes, Inlet, Outlet, UniformFanInShape }
import scala.collection.immutable
object MergeLatestWithDefault {
def apply[T](inputPorts: Int, default: T, eagerComplete: Boolean = false): GraphStage[UniformFanInShape[T, List[T]]] =
new MergeLatestWithDefault[T, List[T]](inputPorts, default, eagerComplete)(_.toList)
}
final class MergeLatestWithDefault[T, M](val inputPorts: Int, val default: T, val eagerClose: Boolean)(buildElem: Array[T] => M)
extends GraphStage[UniformFanInShape[T, M]] {
require(inputPorts >= 1, "input ports must be >= 1")
val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i => Inlet[T]("MergeLatestWithDefault.in" + i))
val out: Outlet[M] = Outlet[M]("MergeLatestWithDefault.out")
override val shape: UniformFanInShape[T, M] = UniformFanInShape(out, in: _*)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler {
private val activeStreams: java.util.HashSet[Int] = new java.util.HashSet[Int]()
private var runningUpstreams: Int = inputPorts
private def upstreamsClosed: Boolean = runningUpstreams == 0
private val messages: Array[Any] = Array.fill[Any](inputPorts)(default)
override def preStart(): Unit = in.foreach(tryPull)
in.zipWithIndex.foreach {
case (input, index) =>
setHandler(
input,
new InHandler {
override def onPush(): Unit = {
messages.update(index, grab(input))
activeStreams.add(index)
emit(out, buildElem(messages.asInstanceOf[Array[T]]))
tryPull(input)
}
override def onUpstreamFinish(): Unit = {
if (!eagerClose) {
runningUpstreams -= 1
if (upstreamsClosed) completeStage()
} else completeStage()
}
})
}
override def onPull(): Unit = {
var i = 0
while (i < inputPorts) {
if (!hasBeenPulled(in(i))) tryPull(in(i))
i += 1
}
}
setHandler(out, this)
}
override def toString = "MergeLatestWithDefault"
}
在这种情况下只需要很少的代码更改。除了 default
的附加参数被填充到数组 messages
之外,唯一的变化是 onPush
中的 emit
不再是有条件的。
正在测试:
import akka.actor.ActorSystem
object CustomMerge {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("system")
val s1 = Source(1 to 3)
val s2 = Source(11 to 13).throttle(1, 50.millis)
val s3 = Source(101 to 103).throttle(1, 100.millis)
Source.combine(s1, s2, s3)(MergeLatestWithDefault[Int](_, 0)).runForeach(println)
}
}
// Output:
//
// List(1, 0, 0)
// List(1, 11, 0)
// List(1, 11, 101)
// List(2, 11, 101)
// List(2, 12, 101)
// List(3, 12, 101)
// List(3, 13, 101)
// List(3, 13, 102)
// List(3, 13, 103)
作为奖励,虽然 mergeLatest
仅在 Akka Stream 2.6
+ 上可用,但根据我的简短测试,此重新调整用途的代码似乎在 2.5
上运行良好。
MergeLatest
的官方 documentation 声明:
MergeLatest emits list for each element emitted from some input stream, but only after each input stream emitted at least one element.
我的问题是:这个可以绕过吗? 例如,我们能否提供一个默认值,使其在从任何输入流中至少接收到一个元素后立即开始生成列表?
以下应该是新行为:
(1,0,0)
(2,0,0)
(2,1,0)
(2,1,1)
(2,1,2)
而不是:
(2,1,1)
(2,1,2)
因为我也需要将那些第一个列表推送到输出流
对于每个传入流,您可以使用 Source.single(0).concat
发出一个零,然后发出流的其余部分:
def withInitialValue[A](source: Source[A, NotUsed], a: A): Source[A, NotUsed] =
Source.single(a).concat(source)
很遗憾,mergeLatest
不提供此类选项。而且似乎没有任何 Stream 运算符可以轻松做到这一点。一种方法是根据特定需要重新调整 MergeLatest
的用途。好消息是必要的代码更改相当简单,因为相关代码实现是 UniformFanInShape
的标准 GraphStage
。
import akka.stream.scaladsl._
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.stream.{ Attributes, Inlet, Outlet, UniformFanInShape }
import scala.collection.immutable
object MergeLatestWithDefault {
def apply[T](inputPorts: Int, default: T, eagerComplete: Boolean = false): GraphStage[UniformFanInShape[T, List[T]]] =
new MergeLatestWithDefault[T, List[T]](inputPorts, default, eagerComplete)(_.toList)
}
final class MergeLatestWithDefault[T, M](val inputPorts: Int, val default: T, val eagerClose: Boolean)(buildElem: Array[T] => M)
extends GraphStage[UniformFanInShape[T, M]] {
require(inputPorts >= 1, "input ports must be >= 1")
val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i => Inlet[T]("MergeLatestWithDefault.in" + i))
val out: Outlet[M] = Outlet[M]("MergeLatestWithDefault.out")
override val shape: UniformFanInShape[T, M] = UniformFanInShape(out, in: _*)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler {
private val activeStreams: java.util.HashSet[Int] = new java.util.HashSet[Int]()
private var runningUpstreams: Int = inputPorts
private def upstreamsClosed: Boolean = runningUpstreams == 0
private val messages: Array[Any] = Array.fill[Any](inputPorts)(default)
override def preStart(): Unit = in.foreach(tryPull)
in.zipWithIndex.foreach {
case (input, index) =>
setHandler(
input,
new InHandler {
override def onPush(): Unit = {
messages.update(index, grab(input))
activeStreams.add(index)
emit(out, buildElem(messages.asInstanceOf[Array[T]]))
tryPull(input)
}
override def onUpstreamFinish(): Unit = {
if (!eagerClose) {
runningUpstreams -= 1
if (upstreamsClosed) completeStage()
} else completeStage()
}
})
}
override def onPull(): Unit = {
var i = 0
while (i < inputPorts) {
if (!hasBeenPulled(in(i))) tryPull(in(i))
i += 1
}
}
setHandler(out, this)
}
override def toString = "MergeLatestWithDefault"
}
在这种情况下只需要很少的代码更改。除了 default
的附加参数被填充到数组 messages
之外,唯一的变化是 onPush
中的 emit
不再是有条件的。
正在测试:
import akka.actor.ActorSystem
object CustomMerge {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("system")
val s1 = Source(1 to 3)
val s2 = Source(11 to 13).throttle(1, 50.millis)
val s3 = Source(101 to 103).throttle(1, 100.millis)
Source.combine(s1, s2, s3)(MergeLatestWithDefault[Int](_, 0)).runForeach(println)
}
}
// Output:
//
// List(1, 0, 0)
// List(1, 11, 0)
// List(1, 11, 101)
// List(2, 11, 101)
// List(2, 12, 101)
// List(3, 12, 101)
// List(3, 13, 101)
// List(3, 13, 102)
// List(3, 13, 103)
作为奖励,虽然 mergeLatest
仅在 Akka Stream 2.6
+ 上可用,但根据我的简短测试,此重新调整用途的代码似乎在 2.5
上运行良好。