Akka 流扩展 dsal 和 off-rabbit

Akka Streams scala DSL and Op-Rabbit

我已经开始使用 Akka Streams 和 Op-Rabbit,但有点困惑。

我需要根据谓词拆分流,然后将它们组合起来,就像我在创建图形和使用分区和合并时所做的那样。

我已经能够使用 GraphDSL.Builder 来做这样的事情,但似乎无法让它与 AckedSource/Flow/Sink

一起工作

图表看起来像:

                        | --> flow1 --> |
source--> partition --> |               | --> flow3 --> sink
                        | --> flow2 --> |

我不确定我是否应该使用 splitWhen,因为我总是恰好需要 2 个流程。

这是一个不做分区也不使用GraphDSL.Builder:

的示例
def splitExample(source: AckedSource[String, SubscriptionRef],
                 queueName: String)
                (implicit actorSystem: ActorSystem): RunnableGraph[SubscriptionRef] = {
  val toStringFlow: Flow[AckTup[Message], AckTup[String], NotUsed] = Flow[AckTup[Message]]
    .map[AckTup[String]](tup => {
      val (p,m) = tup
      (p, new String(m.data))
    })

  val printFlow1: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]]
    .map[AckTup[String]](tup => {
      val (p, s) = tup
      println(s"flow1 processing $s")
      tup
     })

  val printFlow2: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]]
    .map[AckTup[String]](tup => {
      val (p, s) = tup
      println(s"flow2 processing $s")
      tup
    })

  source
    .map(Message.queue(_, queueName))
    .via(AckedFlow(toStringFlow))
    // partition if string.length < 10
    .via(AckedFlow(printFlow1))
    .via(AckedFlow(printFlow2))
    .to(AckedSink.ack)
}

这是我似乎无法运行的代码:

import GraphDSL.Implicits._
def buildModelAcked(source: AckedSource[String, SubscriptionRef] , queueName: String)(implicit actorSystem: ActorSystem):  Graph[ClosedShape, Future[Done]] = {
    import GraphDSL.Implicits._
    GraphDSL.create(Sink.ignore) { implicit builder: GraphDSL.Builder[Future[Done]] => s =>
    import GraphDSL.Implicits._
    source.map(Message.queue(_, queueName)) ~> AckedFlow(toStringFlow) ~> AckedSink.ack
//      source.map(Message.queue(_, queueName)).via(AckedFlow(toStringFlow)).to(AckedSink.ack)
    ClosedShape

}}

编译器无法解析 ~> 运算符

所以我的问题是:

  1. 是否有使用 scala dsl 构建 Acked/Source/Flow/Sink 图表的示例项目?

  2. 是否有与我在这里尝试做的类似的分区和合并示例项目?

处理 acked-stream.

时请记住以下定义
  1. AckedSource[Out, Mat]Source[AckTup[Out], Mat]]
  2. 的包装器
  3. AckedFlow[In, Out, Mat]Flow[AckTup[In], AckTup[Out], Mat]
  4. 的包装器
  5. AckedSink[In, Mat]Sink[AckTup[In], Mat]
  6. 的包装器
  7. AckTup[T](Promise[Unit], T)
  8. 的别名
  9. 经典流组合器将对 AckTup
  10. T 部分进行操作
  11. .acked 组合器将完成 AckedFlow
  12. Promise[Unit]

GraphDSL 边缘运算符 (~>) 将适用于一堆 Akka 预定义形状(参见 GraphDSL.Implicits 的代码),但它不适用于 acked 定义的自定义形状-流库。

你有 2 条出路:

  1. 您可以按照 GraphDSL.Implicits
  2. 中的方式定义自己的 ~> 隐式运算符
  3. 您打开已确认的阶段以获得标准阶段。您可以使用 .wrappedRepr 访问包装阶段 - 在 AckedSourceAckedFlowAckedSink.
  4. 上可用

根据 Stefano Bonetti 的出色指导,这里有一个可能的解决方案:

graph:    
                        |--> short --|
  rabbitMq --> before --|            |--> after
                        |--> long  --|

解决方案:

val before: Flow[AckTup[Message], AckTup[String], NotUsed] = Flow[AckTup[Message]].map[AckTup[String]](tup => {
  val (p,m) = tup
  (p, new String(m.data))
})

val short: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]].map[AckTup[String]](tup => {
  val (p, s) = tup
  println(s"short: $s")
  tup
})
val long: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]].map[AckTup[String]](tup => {
  val (p, s) = tup
  println(s"long: $s")
  tup
})
val after: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]].map[AckTup[String]](tup => {
  val (p, s) = tup
  println(s"all $s")
  tup
})

def buildSplitGraph(source: AckedSource[String, SubscriptionRef]
                    , queueName: String
                    , splitLength: Int)(implicit actorSystem: ActorSystem):  Graph[ClosedShape, Future[Done]] = {
 GraphDSL.create(Sink.ignore) { implicit builder: GraphDSL.Builder[Future[Done]] => s =>
   val toShort = 0
   val toLong = 1

   // junctions
   val split = builder.add(Partition[AckTup[String]](2, (tup: AckTup[String]) => {
                                                           val (p, s) = tup
                                                           if (s.length < splitLength) toShort else toLong
                                                         }
   ))
   val merge = builder.add(Merge[AckTup[String]](2))

   //graph
   val beforeSplit = source.map(Message.queue(_, queueName)).wrappedRepr ~> AckedFlow(before).wrappedRepr
   beforeSplit ~> split
   // must do short, then long since the split goes in that order
   split ~> AckedFlow(short).wrappedRepr ~> merge
   split ~> AckedFlow(long).wrappedRepr ~> merge
   // after the last AckedFlow, be sure to '.acked' so that the message will be removed from the queue
   merge ~> AckedFlow(after).acked ~> s

  ClosedShape
}}

正如 Stefano Bonetti 所说,关键是使用与 AckedFlow 关联的 .wrappedRepr,然后在最后一步使用 .acked 组合器。