Akka Stream TCP + Akka Stream Kafka 生产者不停止不发布消息并且不出错
Akka Stream TCP + Akka Stream Kafka producer not stopping not publishing messages and not error-ing out
我有以下流:
Source(IndexedSeq(ByteString.empty))
.via(
Tcp().outgoingConnection(bsAddress, bsPort)
.via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true))
.map(_.utf8String)
)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runWith(
Producer.plainSink(
ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(s"${kafkaAddress}:${kafkaPort}")
)
).onComplete {
case Success(Done) => printAndByeBye("Stream ends successfully")
case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
}
暂时还不错,可以消费Kafka主题上填充的消息了。但是有时,显然是在一个随机的时间间隔内,不再有消息发布,并且这段代码没有记录任何错误(printAndByeBye 将打印传递的消息并终止 actor 系统。)重新启动应用程序后,消息继续流.
知道如何知道这里发生了什么吗?
编辑:我把 Kamon 放在上面,我可以看到以下行为:
看起来有些东西在没有通知流应该停止的情况下停止了,但我不知道如何让它明确并停止流。
我建议创建具有监督属性的流来处理 TCP 连接中可能出现的异常:
val flow =
Tcp().outgoingConnection("", 12)
.via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true))
.map(_.utf8String).withAttributes(ActorAttributes.supervisionStrategy {
case ex: Throwable =>
println("Error ocurred: " + ex)
Supervision.Resume
}
和
Source(IndexedSeq(ByteString.empty))
.via(flow)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runWith(
Producer.plainSink(
ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(s"${kafkaAddress}:${kafkaPort}")
)
).onComplete {
case Success(Done) => printAndByeBye("Stream ends successfully")
case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
}
如果流有任何错误,流将停止。使用此配置,您将看到流程是否引发了任何异常。
如果一切都变得安静,可能是因为在某处施加了背压。
尝试并有选择地用非背压感知阶段替换背压感知阶段,并检查问题是否仍然存在。
在您的情况下,有两种可能的背压来源:
1) TCP 连接
您可以尝试将 ByteString
的无限来源附加到 Kafka,按照以下方式做一些事情:
Source.cycle(() => List(???).iterator)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runWith(
Producer.plainSink(
ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(s"${kafkaAddress}:${kafkaPort}")
)
).onComplete {
case Success(Done) => printAndByeBye("Stream ends successfully")
case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
}
2) Kafka sink
用一些日志替换它
Source(IndexedSeq(ByteString.empty))
.via(
Tcp().outgoingConnection(bsAddress, bsPort)
.via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true))
.map(_.utf8String)
)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runForeach(println)
.onComplete {
case Success(Done) => printAndByeBye("Stream ends successfully")
case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
}
你能看出这两种情况中只有一种的问题吗?同时?在 none?
流没有失败,但 TCP 流空闲,因为发布数据的设备在一段时间后停止发送数据而没有断开连接。
而不是使用更简单的:
TCP().outgoingConnection(bsAddress, bsPort)
我最终使用:
def outgoingConnection(
remoteAddress: InetSocketAddress,
localAddress: Option[InetSocketAddress] = None,
options: immutable.Traversable[SocketOption] = Nil,
halfClose: Boolean = true,
connectTimeout: Duration = Duration.Inf,
idleTimeout: Duration = Duration.Inf): Flow[ByteString, ByteString, Future[OutgoingConnection]] = ???
所以
Tcp().outgoingConnection(bsAddress, bsPort)
成为
val connectTimeout: Duration = 1 second
val idleTimeout: Duration = 2 second
Tcp().outgoingConnection(
remoteAddress = InetSocketAddress.createUnresolved(bsAddress, bsPort),
connectTimeout = connectTimeout,
idleTimeout = idleTimeout
)
通过通知 idleTimeout,后续启动失败,可以重新启动另一个流程。
我有以下流:
Source(IndexedSeq(ByteString.empty))
.via(
Tcp().outgoingConnection(bsAddress, bsPort)
.via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true))
.map(_.utf8String)
)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runWith(
Producer.plainSink(
ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(s"${kafkaAddress}:${kafkaPort}")
)
).onComplete {
case Success(Done) => printAndByeBye("Stream ends successfully")
case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
}
暂时还不错,可以消费Kafka主题上填充的消息了。但是有时,显然是在一个随机的时间间隔内,不再有消息发布,并且这段代码没有记录任何错误(printAndByeBye 将打印传递的消息并终止 actor 系统。)重新启动应用程序后,消息继续流.
知道如何知道这里发生了什么吗?
编辑:我把 Kamon 放在上面,我可以看到以下行为:
看起来有些东西在没有通知流应该停止的情况下停止了,但我不知道如何让它明确并停止流。
我建议创建具有监督属性的流来处理 TCP 连接中可能出现的异常:
val flow =
Tcp().outgoingConnection("", 12)
.via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true))
.map(_.utf8String).withAttributes(ActorAttributes.supervisionStrategy {
case ex: Throwable =>
println("Error ocurred: " + ex)
Supervision.Resume
}
和
Source(IndexedSeq(ByteString.empty))
.via(flow)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runWith(
Producer.plainSink(
ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(s"${kafkaAddress}:${kafkaPort}")
)
).onComplete {
case Success(Done) => printAndByeBye("Stream ends successfully")
case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
}
如果流有任何错误,流将停止。使用此配置,您将看到流程是否引发了任何异常。
如果一切都变得安静,可能是因为在某处施加了背压。 尝试并有选择地用非背压感知阶段替换背压感知阶段,并检查问题是否仍然存在。 在您的情况下,有两种可能的背压来源:
1) TCP 连接
您可以尝试将 ByteString
的无限来源附加到 Kafka,按照以下方式做一些事情:
Source.cycle(() => List(???).iterator)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runWith(
Producer.plainSink(
ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(s"${kafkaAddress}:${kafkaPort}")
)
).onComplete {
case Success(Done) => printAndByeBye("Stream ends successfully")
case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
}
2) Kafka sink
用一些日志替换它
Source(IndexedSeq(ByteString.empty))
.via(
Tcp().outgoingConnection(bsAddress, bsPort)
.via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true))
.map(_.utf8String)
)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runForeach(println)
.onComplete {
case Success(Done) => printAndByeBye("Stream ends successfully")
case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
}
你能看出这两种情况中只有一种的问题吗?同时?在 none?
流没有失败,但 TCP 流空闲,因为发布数据的设备在一段时间后停止发送数据而没有断开连接。 而不是使用更简单的:
TCP().outgoingConnection(bsAddress, bsPort)
我最终使用:
def outgoingConnection(
remoteAddress: InetSocketAddress,
localAddress: Option[InetSocketAddress] = None,
options: immutable.Traversable[SocketOption] = Nil,
halfClose: Boolean = true,
connectTimeout: Duration = Duration.Inf,
idleTimeout: Duration = Duration.Inf): Flow[ByteString, ByteString, Future[OutgoingConnection]] = ???
所以
Tcp().outgoingConnection(bsAddress, bsPort)
成为
val connectTimeout: Duration = 1 second
val idleTimeout: Duration = 2 second
Tcp().outgoingConnection(
remoteAddress = InetSocketAddress.createUnresolved(bsAddress, bsPort),
connectTimeout = connectTimeout,
idleTimeout = idleTimeout
)
通过通知 idleTimeout,后续启动失败,可以重新启动另一个流程。