Akka Streams Reactive Kafka - 高负载下的 OutOfMemoryError
Akka Streams Reactive Kafka - OutOfMemoryError under high load
我是 运行 一个 Akka Streams Reactive Kafka 应用程序,它应该可以在重负载下运行。 运行 应用程序运行大约 10 分钟后,应用程序关闭并出现 OutOfMemoryError
。我尝试调试堆转储,发现 akka.dispatch.Dispatcher
占用了大约 5GB 的内存。下面是我的配置文件。
Akka 版本:2.4.18
响应式 Kafka 版本:2.4.18
1.application.conf
:
consumer {
num-consumers = "2"
c1 {
bootstrap-servers = "localhost:9092"
bootstrap-servers=${?KAFKA_CONSUMER_ENDPOINT1}
groupId = "testakkagroup1"
subscription-topic = "test"
subscription-topic=${?SUBSCRIPTION_TOPIC1}
message-type = "UserEventMessage"
poll-interval = 100ms
poll-timeout = 50ms
stop-timeout = 30s
close-timeout = 20s
commit-timeout = 15s
wakeup-timeout = 10s
use-dispatcher = "akka.kafka.default-dispatcher"
kafka-clients {
enable.auto.commit = true
}
}
2.build.sbt
:
java -Xmx6g \
-Dcom.sun.management.jmxremote.port=27019 \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Djava.rmi.server.hostname=localhost \
-Dzookeeper.host=$ZK_HOST \
-Dzookeeper.port=$ZK_PORT \
-jar ./target/scala-2.11/test-assembly-1.0.jar
3.Source
和 Sink
演员:
class EventStream extends Actor with ActorLogging {
implicit val actorSystem = context.system
implicit val timeout: Timeout = Timeout(10 seconds)
implicit val materializer = ActorMaterializer()
val settings = Settings(actorSystem).KafkaConsumers
override def receive: Receive = {
case StartUserEvent(id) =>
startStreamConsumer(consumerConfig("EventMessage"+".c"+id))
}
def startStreamConsumer(config: Map[String, String]) = {
val consumerSource = createConsumerSource(config)
val consumerSink = createConsumerSink()
val messageProcessor = startMessageProcessor(actorA, actorB, actorC)
log.info("Starting The UserEventStream processing")
val future = consumerSource.map { message =>
val m = s"${message.record.value()}"
messageProcessor ? m
}.runWith(consumerSink)
future.onComplete {
case _ => actorSystem.stop(messageProcessor)
}
}
def startMessageProcessor(actorA: ActorRef, actorB: ActorRef, actorC: ActorRef) = {
actorSystem.actorOf(Props(classOf[MessageProcessor], actorA, actorB, actorC))
}
def createConsumerSource(config: Map[String, String]) = {
val kafkaMBAddress = config("bootstrap-servers")
val groupID = config("groupId")
val topicSubscription = config("subscription-topic").split(',').toList
println(s"Subscriptiontopics $topicSubscription")
val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers(kafkaMBAddress)
.withGroupId(groupID)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true")
Consumer.committableSource(consumerSettings, Subscriptions.topics(topicSubscription:_*))
}
def createConsumerSink() = {
Sink.foreach(println)
}
}
本例中actorA
、actorB
、actorC
在做一些业务逻辑处理和数据库交互。在处理 Akka Reactive Kafka 消费者(例如提交、错误或节流配置)时,我是否遗漏了什么?因为查看堆转储,我可以猜到消息正在堆积。
我要更改的一件事如下:
val future = consumerSource.map { message =>
val m = s"${message.record.value()}"
messageProcessor ? m
}.runWith(consumerSink)
在上面的代码中,您使用 ask
向 messageProcessor
参与者发送消息并期待回复,但是为了让 ask
发挥背压机制的作用,您需要将它与 mapAsync
一起使用(更多信息在 documentation 中)。类似于以下内容:
val future =
consumerSource
.mapAsync(parallelism = 5) { message =>
val m = s"${message.record.value()}"
messageProcessor ? m
}
.runWith(consumerSink)
根据需要调整并行度。
我是 运行 一个 Akka Streams Reactive Kafka 应用程序,它应该可以在重负载下运行。 运行 应用程序运行大约 10 分钟后,应用程序关闭并出现 OutOfMemoryError
。我尝试调试堆转储,发现 akka.dispatch.Dispatcher
占用了大约 5GB 的内存。下面是我的配置文件。
Akka 版本:2.4.18
响应式 Kafka 版本:2.4.18
1.application.conf
:
consumer {
num-consumers = "2"
c1 {
bootstrap-servers = "localhost:9092"
bootstrap-servers=${?KAFKA_CONSUMER_ENDPOINT1}
groupId = "testakkagroup1"
subscription-topic = "test"
subscription-topic=${?SUBSCRIPTION_TOPIC1}
message-type = "UserEventMessage"
poll-interval = 100ms
poll-timeout = 50ms
stop-timeout = 30s
close-timeout = 20s
commit-timeout = 15s
wakeup-timeout = 10s
use-dispatcher = "akka.kafka.default-dispatcher"
kafka-clients {
enable.auto.commit = true
}
}
2.build.sbt
:
java -Xmx6g \
-Dcom.sun.management.jmxremote.port=27019 \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Djava.rmi.server.hostname=localhost \
-Dzookeeper.host=$ZK_HOST \
-Dzookeeper.port=$ZK_PORT \
-jar ./target/scala-2.11/test-assembly-1.0.jar
3.Source
和 Sink
演员:
class EventStream extends Actor with ActorLogging {
implicit val actorSystem = context.system
implicit val timeout: Timeout = Timeout(10 seconds)
implicit val materializer = ActorMaterializer()
val settings = Settings(actorSystem).KafkaConsumers
override def receive: Receive = {
case StartUserEvent(id) =>
startStreamConsumer(consumerConfig("EventMessage"+".c"+id))
}
def startStreamConsumer(config: Map[String, String]) = {
val consumerSource = createConsumerSource(config)
val consumerSink = createConsumerSink()
val messageProcessor = startMessageProcessor(actorA, actorB, actorC)
log.info("Starting The UserEventStream processing")
val future = consumerSource.map { message =>
val m = s"${message.record.value()}"
messageProcessor ? m
}.runWith(consumerSink)
future.onComplete {
case _ => actorSystem.stop(messageProcessor)
}
}
def startMessageProcessor(actorA: ActorRef, actorB: ActorRef, actorC: ActorRef) = {
actorSystem.actorOf(Props(classOf[MessageProcessor], actorA, actorB, actorC))
}
def createConsumerSource(config: Map[String, String]) = {
val kafkaMBAddress = config("bootstrap-servers")
val groupID = config("groupId")
val topicSubscription = config("subscription-topic").split(',').toList
println(s"Subscriptiontopics $topicSubscription")
val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers(kafkaMBAddress)
.withGroupId(groupID)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true")
Consumer.committableSource(consumerSettings, Subscriptions.topics(topicSubscription:_*))
}
def createConsumerSink() = {
Sink.foreach(println)
}
}
本例中actorA
、actorB
、actorC
在做一些业务逻辑处理和数据库交互。在处理 Akka Reactive Kafka 消费者(例如提交、错误或节流配置)时,我是否遗漏了什么?因为查看堆转储,我可以猜到消息正在堆积。
我要更改的一件事如下:
val future = consumerSource.map { message =>
val m = s"${message.record.value()}"
messageProcessor ? m
}.runWith(consumerSink)
在上面的代码中,您使用 ask
向 messageProcessor
参与者发送消息并期待回复,但是为了让 ask
发挥背压机制的作用,您需要将它与 mapAsync
一起使用(更多信息在 documentation 中)。类似于以下内容:
val future =
consumerSource
.mapAsync(parallelism = 5) { message =>
val m = s"${message.record.value()}"
messageProcessor ? m
}
.runWith(consumerSink)
根据需要调整并行度。