Akka 流扩展 dsal 和 off-rabbit
Akka Streams scala DSL and Op-Rabbit
我已经开始使用 Akka Streams 和 Op-Rabbit,但有点困惑。
我需要根据谓词拆分流,然后将它们组合起来,就像我在创建图形和使用分区和合并时所做的那样。
我已经能够使用 GraphDSL.Builder 来做这样的事情,但似乎无法让它与 AckedSource/Flow/Sink
一起工作
图表看起来像:
| --> flow1 --> |
source--> partition --> | | --> flow3 --> sink
| --> flow2 --> |
我不确定我是否应该使用 splitWhen,因为我总是恰好需要 2 个流程。
这是一个不做分区也不使用GraphDSL.Builder:
的示例
def splitExample(source: AckedSource[String, SubscriptionRef],
queueName: String)
(implicit actorSystem: ActorSystem): RunnableGraph[SubscriptionRef] = {
val toStringFlow: Flow[AckTup[Message], AckTup[String], NotUsed] = Flow[AckTup[Message]]
.map[AckTup[String]](tup => {
val (p,m) = tup
(p, new String(m.data))
})
val printFlow1: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]]
.map[AckTup[String]](tup => {
val (p, s) = tup
println(s"flow1 processing $s")
tup
})
val printFlow2: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]]
.map[AckTup[String]](tup => {
val (p, s) = tup
println(s"flow2 processing $s")
tup
})
source
.map(Message.queue(_, queueName))
.via(AckedFlow(toStringFlow))
// partition if string.length < 10
.via(AckedFlow(printFlow1))
.via(AckedFlow(printFlow2))
.to(AckedSink.ack)
}
这是我似乎无法运行的代码:
import GraphDSL.Implicits._
def buildModelAcked(source: AckedSource[String, SubscriptionRef] , queueName: String)(implicit actorSystem: ActorSystem): Graph[ClosedShape, Future[Done]] = {
import GraphDSL.Implicits._
GraphDSL.create(Sink.ignore) { implicit builder: GraphDSL.Builder[Future[Done]] => s =>
import GraphDSL.Implicits._
source.map(Message.queue(_, queueName)) ~> AckedFlow(toStringFlow) ~> AckedSink.ack
// source.map(Message.queue(_, queueName)).via(AckedFlow(toStringFlow)).to(AckedSink.ack)
ClosedShape
}}
编译器无法解析 ~>
运算符
所以我的问题是:
是否有使用 scala dsl 构建 Acked/Source/Flow/Sink 图表的示例项目?
是否有与我在这里尝试做的类似的分区和合并示例项目?
处理 acked-stream.
时请记住以下定义
AckedSource[Out, Mat]
是 Source[AckTup[Out], Mat]]
的包装器
AckedFlow[In, Out, Mat]
是 Flow[AckTup[In], AckTup[Out], Mat]
的包装器
AckedSink[In, Mat]
是 Sink[AckTup[In], Mat]
的包装器
AckTup[T]
是 (Promise[Unit], T)
的别名
- 经典流组合器将对
AckTup
的 T
部分进行操作
.acked
组合器将完成 AckedFlow
的 Promise[Unit]
GraphDSL 边缘运算符 (~>
) 将适用于一堆 Akka 预定义形状(参见 GraphDSL.Implicits
的代码),但它不适用于 acked 定义的自定义形状-流库。
你有 2 条出路:
- 您可以按照
GraphDSL.Implicits
中的方式定义自己的 ~>
隐式运算符
- 您打开已确认的阶段以获得标准阶段。您可以使用
.wrappedRepr
访问包装阶段 - 在 AckedSource
、AckedFlow
和 AckedSink
. 上可用
根据 Stefano Bonetti 的出色指导,这里有一个可能的解决方案:
graph:
|--> short --|
rabbitMq --> before --| |--> after
|--> long --|
解决方案:
val before: Flow[AckTup[Message], AckTup[String], NotUsed] = Flow[AckTup[Message]].map[AckTup[String]](tup => {
val (p,m) = tup
(p, new String(m.data))
})
val short: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]].map[AckTup[String]](tup => {
val (p, s) = tup
println(s"short: $s")
tup
})
val long: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]].map[AckTup[String]](tup => {
val (p, s) = tup
println(s"long: $s")
tup
})
val after: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]].map[AckTup[String]](tup => {
val (p, s) = tup
println(s"all $s")
tup
})
def buildSplitGraph(source: AckedSource[String, SubscriptionRef]
, queueName: String
, splitLength: Int)(implicit actorSystem: ActorSystem): Graph[ClosedShape, Future[Done]] = {
GraphDSL.create(Sink.ignore) { implicit builder: GraphDSL.Builder[Future[Done]] => s =>
val toShort = 0
val toLong = 1
// junctions
val split = builder.add(Partition[AckTup[String]](2, (tup: AckTup[String]) => {
val (p, s) = tup
if (s.length < splitLength) toShort else toLong
}
))
val merge = builder.add(Merge[AckTup[String]](2))
//graph
val beforeSplit = source.map(Message.queue(_, queueName)).wrappedRepr ~> AckedFlow(before).wrappedRepr
beforeSplit ~> split
// must do short, then long since the split goes in that order
split ~> AckedFlow(short).wrappedRepr ~> merge
split ~> AckedFlow(long).wrappedRepr ~> merge
// after the last AckedFlow, be sure to '.acked' so that the message will be removed from the queue
merge ~> AckedFlow(after).acked ~> s
ClosedShape
}}
正如 Stefano Bonetti 所说,关键是使用与 AckedFlow
关联的 .wrappedRepr
,然后在最后一步使用 .acked
组合器。
我已经开始使用 Akka Streams 和 Op-Rabbit,但有点困惑。
我需要根据谓词拆分流,然后将它们组合起来,就像我在创建图形和使用分区和合并时所做的那样。
我已经能够使用 GraphDSL.Builder 来做这样的事情,但似乎无法让它与 AckedSource/Flow/Sink
一起工作图表看起来像:
| --> flow1 --> |
source--> partition --> | | --> flow3 --> sink
| --> flow2 --> |
我不确定我是否应该使用 splitWhen,因为我总是恰好需要 2 个流程。
这是一个不做分区也不使用GraphDSL.Builder:
的示例def splitExample(source: AckedSource[String, SubscriptionRef],
queueName: String)
(implicit actorSystem: ActorSystem): RunnableGraph[SubscriptionRef] = {
val toStringFlow: Flow[AckTup[Message], AckTup[String], NotUsed] = Flow[AckTup[Message]]
.map[AckTup[String]](tup => {
val (p,m) = tup
(p, new String(m.data))
})
val printFlow1: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]]
.map[AckTup[String]](tup => {
val (p, s) = tup
println(s"flow1 processing $s")
tup
})
val printFlow2: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]]
.map[AckTup[String]](tup => {
val (p, s) = tup
println(s"flow2 processing $s")
tup
})
source
.map(Message.queue(_, queueName))
.via(AckedFlow(toStringFlow))
// partition if string.length < 10
.via(AckedFlow(printFlow1))
.via(AckedFlow(printFlow2))
.to(AckedSink.ack)
}
这是我似乎无法运行的代码:
import GraphDSL.Implicits._
def buildModelAcked(source: AckedSource[String, SubscriptionRef] , queueName: String)(implicit actorSystem: ActorSystem): Graph[ClosedShape, Future[Done]] = {
import GraphDSL.Implicits._
GraphDSL.create(Sink.ignore) { implicit builder: GraphDSL.Builder[Future[Done]] => s =>
import GraphDSL.Implicits._
source.map(Message.queue(_, queueName)) ~> AckedFlow(toStringFlow) ~> AckedSink.ack
// source.map(Message.queue(_, queueName)).via(AckedFlow(toStringFlow)).to(AckedSink.ack)
ClosedShape
}}
编译器无法解析 ~>
运算符
所以我的问题是:
是否有使用 scala dsl 构建 Acked/Source/Flow/Sink 图表的示例项目?
是否有与我在这里尝试做的类似的分区和合并示例项目?
处理 acked-stream.
时请记住以下定义AckedSource[Out, Mat]
是Source[AckTup[Out], Mat]]
的包装器
AckedFlow[In, Out, Mat]
是Flow[AckTup[In], AckTup[Out], Mat]
的包装器
AckedSink[In, Mat]
是Sink[AckTup[In], Mat]
的包装器
AckTup[T]
是(Promise[Unit], T)
的别名
- 经典流组合器将对
AckTup
的 .acked
组合器将完成AckedFlow
的
T
部分进行操作
Promise[Unit]
GraphDSL 边缘运算符 (~>
) 将适用于一堆 Akka 预定义形状(参见 GraphDSL.Implicits
的代码),但它不适用于 acked 定义的自定义形状-流库。
你有 2 条出路:
- 您可以按照
GraphDSL.Implicits
中的方式定义自己的 - 您打开已确认的阶段以获得标准阶段。您可以使用
.wrappedRepr
访问包装阶段 - 在AckedSource
、AckedFlow
和AckedSink
. 上可用
~>
隐式运算符
根据 Stefano Bonetti 的出色指导,这里有一个可能的解决方案:
graph:
|--> short --|
rabbitMq --> before --| |--> after
|--> long --|
解决方案:
val before: Flow[AckTup[Message], AckTup[String], NotUsed] = Flow[AckTup[Message]].map[AckTup[String]](tup => {
val (p,m) = tup
(p, new String(m.data))
})
val short: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]].map[AckTup[String]](tup => {
val (p, s) = tup
println(s"short: $s")
tup
})
val long: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]].map[AckTup[String]](tup => {
val (p, s) = tup
println(s"long: $s")
tup
})
val after: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]].map[AckTup[String]](tup => {
val (p, s) = tup
println(s"all $s")
tup
})
def buildSplitGraph(source: AckedSource[String, SubscriptionRef]
, queueName: String
, splitLength: Int)(implicit actorSystem: ActorSystem): Graph[ClosedShape, Future[Done]] = {
GraphDSL.create(Sink.ignore) { implicit builder: GraphDSL.Builder[Future[Done]] => s =>
val toShort = 0
val toLong = 1
// junctions
val split = builder.add(Partition[AckTup[String]](2, (tup: AckTup[String]) => {
val (p, s) = tup
if (s.length < splitLength) toShort else toLong
}
))
val merge = builder.add(Merge[AckTup[String]](2))
//graph
val beforeSplit = source.map(Message.queue(_, queueName)).wrappedRepr ~> AckedFlow(before).wrappedRepr
beforeSplit ~> split
// must do short, then long since the split goes in that order
split ~> AckedFlow(short).wrappedRepr ~> merge
split ~> AckedFlow(long).wrappedRepr ~> merge
// after the last AckedFlow, be sure to '.acked' so that the message will be removed from the queue
merge ~> AckedFlow(after).acked ~> s
ClosedShape
}}
正如 Stefano Bonetti 所说,关键是使用与 AckedFlow
关联的 .wrappedRepr
,然后在最后一步使用 .acked
组合器。