Akka Stream 是否像 Kafka Streams 那样实现了连接语义?

Does Akka Stream Implement the Join Semantic as Kafka Streams Does?

我对 Akka Streams 很陌生,而我对 Kafka Streams 有一些经验。 Akka Streams 似乎缺少的一件事是可以将两个不同的流连接在一起。

Kafka Streams 允许使用消息的键连接来自两个不同流(或表)的信息。

Akka Streams 中有类似的东西吗?

很遗憾,简短的回答是否定的。我认为 Akka-streams 比 Kafka-Stream、Spark Streaming 或 Flink 更底层。但是,您可以更好地控制自己所做的事情。基本上,这意味着您可以构建自己的连接运算符。在 lightbend.

查看此讨论

基本上,您必须从 2 Source 中获取数据,Merge 它们并根据时间或元组数量发送到 window,计算连接,然后发出数据到Sink。我已经完成了这个 PoC(仍未完成),但我按照我在这里对你说的操作符进行操作,它正在编译和工作。基本上,我还是要在window里面加入数据。目前,我只是以小批量方式发出它们。

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{Attributes, ClosedShape, FlowShape, Inlet, Outlet}
import akka.stream.scaladsl.{Flow, GraphDSL, Merge, RunnableGraph, Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler, TimerGraphStageLogic}

import scala.collection.mutable
import scala.concurrent.duration._

object StreamOpenGraphJoin {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("StreamOpenGraphJoin")
    val incrementSource: Source[Int, NotUsed] = Source(1 to 10).throttle(1, 1 second)
    val decrementSource: Source[Int, NotUsed] = Source(10 to 20).throttle(1, 1 second)

    def tokenizerSource(key: Int) = {
      Flow[Int].map { value =>
        (key, value)
      }
    }

    // Step 1 - setting up the fundamental for a stream graph
    val switchJoinStrategies = RunnableGraph.fromGraph(
      GraphDSL.create() { implicit builder =>
        import GraphDSL.Implicits._

        // Step 2 - add partition and merge strategy
        val tokenizerShape00 = builder.add(tokenizerSource(0))
        val tokenizerShape01 = builder.add(tokenizerSource(1))

        val mergeTupleShape = builder.add(Merge[(Int, Int)](2))
        val batchFlow = Flow.fromGraph(new BatchTimerFlow[(Int, Int)](5 seconds))
        val sinkShape = builder.add(Sink.foreach[(Int, Int)](x => println(s" > sink: $x")))

        // Step 3 - tying up the components
        incrementSource ~> tokenizerShape00 ~> mergeTupleShape.in(0)
        decrementSource ~> tokenizerShape01 ~> mergeTupleShape.in(1)
        mergeTupleShape.out ~> batchFlow ~> sinkShape

        // Step 4 - return the shape
        ClosedShape
      }
    )
    // run the graph and materialize it
    val graph = switchJoinStrategies.run()
  }

  // step 0: define the shape
  class BatchTimerFlow[T](silencePeriod: FiniteDuration) extends GraphStage[FlowShape[T, T]] {
    // step 1: define the ports and the component-specific members
    val in = Inlet[T]("BatchTimerFlow.in")
    val out = Outlet[T]("BatchTimerFlow.out")

    // step 3: create the logic
    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
      // mutable state
      val batch = new mutable.Queue[T]
      var open = false

      // step 4: define mutable state implement my logic here
      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          try {
            val nextElement = grab(in)
            batch.enqueue(nextElement)
            Thread.sleep(50) // simulate an expensive computation
            if (open) pull(in) // send demand upstream signal, asking for another element
            else {
              // forward the element to the downstream operator
              emitMultiple(out, batch.dequeueAll(_ => true).to[collection.immutable.Iterable])
              open = true
              scheduleOnce(None, silencePeriod)
            }
          } catch {
            case e: Throwable => failStage(e)
          }
        }
      })
      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          pull(in)
        }
      })
      override protected def onTimer(timerKey: Any): Unit = {
        open = false
      }
    }
    // step 2: construct a new shape
    override def shape: FlowShape[T, T] = FlowShape[T, T](in, out)
  }
}