MergeLatest 的默认值

Default value for MergeLatest

MergeLatest 的官方 documentation 声明:

MergeLatest emits list for each element emitted from some input stream, but only after each input stream emitted at least one element.

我的问题是:这个可以绕过吗? 例如,我们能否提供一个默认值,使其在从任何输入流中至少接收到一个元素后立即开始生成列表?

以下应该是新行为:

(1,0,0)
(2,0,0)
(2,1,0)
(2,1,1)
(2,1,2)

而不是:

(2,1,1)
(2,1,2)

因为我也需要将那些第一个列表推送到输出流

对于每个传入流,您可以使用 Source.single(0).concat 发出一个零,然后发出流的其余部分:

def withInitialValue[A](source: Source[A, NotUsed], a: A): Source[A, NotUsed] =
  Source.single(a).concat(source)

很遗憾,mergeLatest 不提供此类选项。而且似乎没有任何 Stream 运算符可以轻松做到这一点。一种方法是根据特定需要重新调整 MergeLatest 的用途。好消息是必要的代码更改相当简单,因为相关代码实现是 UniformFanInShape 的标准 GraphStage

import akka.stream.scaladsl._
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.stream.{ Attributes, Inlet, Outlet, UniformFanInShape }
import scala.collection.immutable

object MergeLatestWithDefault {
  def apply[T](inputPorts: Int, default: T, eagerComplete: Boolean = false): GraphStage[UniformFanInShape[T, List[T]]] =
    new MergeLatestWithDefault[T, List[T]](inputPorts, default, eagerComplete)(_.toList)
}

final class MergeLatestWithDefault[T, M](val inputPorts: Int, val default: T, val eagerClose: Boolean)(buildElem: Array[T] => M)
    extends GraphStage[UniformFanInShape[T, M]] {
  require(inputPorts >= 1, "input ports must be >= 1")

  val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i => Inlet[T]("MergeLatestWithDefault.in" + i))
  val out: Outlet[M] = Outlet[M]("MergeLatestWithDefault.out")
  override val shape: UniformFanInShape[T, M] = UniformFanInShape(out, in: _*)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with OutHandler {
      private val activeStreams: java.util.HashSet[Int] = new java.util.HashSet[Int]()
      private var runningUpstreams: Int = inputPorts
      private def upstreamsClosed: Boolean = runningUpstreams == 0
      private val messages: Array[Any] = Array.fill[Any](inputPorts)(default)

      override def preStart(): Unit = in.foreach(tryPull)

      in.zipWithIndex.foreach {
        case (input, index) =>
          setHandler(
            input,
            new InHandler {
              override def onPush(): Unit = {
                messages.update(index, grab(input))
                activeStreams.add(index)
                emit(out, buildElem(messages.asInstanceOf[Array[T]]))
                tryPull(input)
              }

              override def onUpstreamFinish(): Unit = {
                if (!eagerClose) {
                  runningUpstreams -= 1
                  if (upstreamsClosed) completeStage()
                } else completeStage()
              }
            })
      }

      override def onPull(): Unit = {
        var i = 0
        while (i < inputPorts) {
          if (!hasBeenPulled(in(i))) tryPull(in(i))
          i += 1
        }
      }

      setHandler(out, this)
    }

  override def toString = "MergeLatestWithDefault"
}

在这种情况下只需要很少的代码更改。除了 default 的附加参数被填充到数组 messages 之外,唯一的变化是 onPush 中的 emit 不再是有条件的。

正在测试:

import akka.actor.ActorSystem

object CustomMerge {

  def main(args: Array[String]): Unit = {

    implicit val system = ActorSystem("system")

    val s1 = Source(1 to 3)
    val s2 = Source(11 to 13).throttle(1, 50.millis)
    val s3 = Source(101 to 103).throttle(1, 100.millis)

    Source.combine(s1, s2, s3)(MergeLatestWithDefault[Int](_, 0)).runForeach(println)
  }
}

// Output:
//
// List(1, 0, 0)
// List(1, 11, 0)
// List(1, 11, 101)
// List(2, 11, 101)
// List(2, 12, 101)
// List(3, 12, 101)
// List(3, 13, 101)
// List(3, 13, 102)
// List(3, 13, 103)

作为奖励,虽然 mergeLatest 仅在 Akka Stream 2.6+ 上可用,但根据我的简短测试,此重新调整用途的代码似乎在 2.5 上运行良好。