使用 Akka 流分区时,我已经“已连接”
I'm getting `already connected` when using Akka stream partitions
我正在尝试 Akka Stream API,但我不知道为什么会抛出 java.lang.IllegalArgumentException。
val graph = RunnableGraph.fromGraph(
GraphDSL.create(source, sink)
((source, sink) => Seq(source, sink)) {
implicit b => (source, sink) =>
Import akka.stream.scaladsl.GraphDSL.Implicits._
val partition = b.add(Partition[(KinesisRecord)](2, flow => {
1
}))
source ~> partition.in
partition.out(0) ~> sink
partition.out(1) ~> sink
ClosedShape
})
这是当前代码。报错如下
[info] - should consume *** FAILED ***
[info] java.lang.IllegalArgumentException: [Map.in] is already connected
[info] at akka.stream.scaladsl.GraphDSL$Builder.addEdge(Graph.scala:1567)
[info] at akka.stream.scaladsl.GraphDSL$Implicits$CombinerBase.$tilde$greater(Graph.scala:1730)
[info] at akka.stream.scaladsl.GraphDSL$Implicits$CombinerBase.$tilde$greater$(Graph.scala:1729)
[info] at akka.stream.scaladsl.GraphDSL$Implicits$PortOpsImpl.$tilde$greater(Graph.scala:1784)
[info] at akka.stream.scaladsl.GraphApply.create(GraphApply.scala:46)
[info] at akka.stream.scaladsl.GraphApply.create$(GraphApply.scala:41)
[info] at akka.stream.scaladsl.GraphDSL$.create(Graph.scala:1529)
我正在使用 kinesisRecord 作为源的目标。
但是,在此代码中,如果我将 outputPorts 更改为 1 并删除
partition.out(1) ~> sink
这一行,行得通。
我不知道是我遗漏了什么还是只是一个错误。
我在我的环境中重现了您的错误并使用了解决方案。但我使用的是 Int
来源,而不是 Kinesis
来源。您可以将其替换为您的数据类型,它可能会起作用。
import akka.actor.ActorSystem
import akka.stream.ClosedShape
import akka.stream.scaladsl.{GraphDSL, Merge, Partition, RunnableGraph, Sink, Source}
import scala.concurrent.duration._
object AkkaStreamWithKinesis extends App {
implicit val system = ActorSystem("AkkaStreamWithKinesisSystem")
val source = Source(1 to 1000).throttle(5, 1 second)
val sink = Sink.foreach[Int](println(_))
val graph = RunnableGraph.fromGraph(
GraphDSL.create(source, sink)
((source, sink) => Seq(source, sink)) {
implicit builder =>
(source, sink) =>
import akka.stream.scaladsl.GraphDSL.Implicits._
val partition = builder.add(Partition[Int](2, flow => {
1
}))
val merge = builder.add(Merge[Int](2))
source ~> partition.in
partition.out(0) ~> merge.in(0)
partition.out(1) ~> merge.in(1)
merge.out ~> sink
ClosedShape
}).run()
}
输出:
09:15:36.443 [AkkaStreamWithKinesisSystem-akka.actor.default-dispatcher-6] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
1
2
3
4
5
6
7
8
9
10
11
12
我正在尝试 Akka Stream API,但我不知道为什么会抛出 java.lang.IllegalArgumentException。
val graph = RunnableGraph.fromGraph(
GraphDSL.create(source, sink)
((source, sink) => Seq(source, sink)) {
implicit b => (source, sink) =>
Import akka.stream.scaladsl.GraphDSL.Implicits._
val partition = b.add(Partition[(KinesisRecord)](2, flow => {
1
}))
source ~> partition.in
partition.out(0) ~> sink
partition.out(1) ~> sink
ClosedShape
})
这是当前代码。报错如下
[info] - should consume *** FAILED ***
[info] java.lang.IllegalArgumentException: [Map.in] is already connected
[info] at akka.stream.scaladsl.GraphDSL$Builder.addEdge(Graph.scala:1567)
[info] at akka.stream.scaladsl.GraphDSL$Implicits$CombinerBase.$tilde$greater(Graph.scala:1730)
[info] at akka.stream.scaladsl.GraphDSL$Implicits$CombinerBase.$tilde$greater$(Graph.scala:1729)
[info] at akka.stream.scaladsl.GraphDSL$Implicits$PortOpsImpl.$tilde$greater(Graph.scala:1784)
[info] at akka.stream.scaladsl.GraphApply.create(GraphApply.scala:46)
[info] at akka.stream.scaladsl.GraphApply.create$(GraphApply.scala:41)
[info] at akka.stream.scaladsl.GraphDSL$.create(Graph.scala:1529)
我正在使用 kinesisRecord 作为源的目标。 但是,在此代码中,如果我将 outputPorts 更改为 1 并删除
partition.out(1) ~> sink
这一行,行得通。
我不知道是我遗漏了什么还是只是一个错误。
我在我的环境中重现了您的错误并使用了解决方案。但我使用的是 Int
来源,而不是 Kinesis
来源。您可以将其替换为您的数据类型,它可能会起作用。
import akka.actor.ActorSystem
import akka.stream.ClosedShape
import akka.stream.scaladsl.{GraphDSL, Merge, Partition, RunnableGraph, Sink, Source}
import scala.concurrent.duration._
object AkkaStreamWithKinesis extends App {
implicit val system = ActorSystem("AkkaStreamWithKinesisSystem")
val source = Source(1 to 1000).throttle(5, 1 second)
val sink = Sink.foreach[Int](println(_))
val graph = RunnableGraph.fromGraph(
GraphDSL.create(source, sink)
((source, sink) => Seq(source, sink)) {
implicit builder =>
(source, sink) =>
import akka.stream.scaladsl.GraphDSL.Implicits._
val partition = builder.add(Partition[Int](2, flow => {
1
}))
val merge = builder.add(Merge[Int](2))
source ~> partition.in
partition.out(0) ~> merge.in(0)
partition.out(1) ~> merge.in(1)
merge.out ~> sink
ClosedShape
}).run()
}
输出:
09:15:36.443 [AkkaStreamWithKinesisSystem-akka.actor.default-dispatcher-6] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
1
2
3
4
5
6
7
8
9
10
11
12