使用 Akka Actors 的 OutOfMemoryError
OutOfMemoryError using Akka Actors
我有一个使用来自 RabbitMQ 的消息的应用程序,我正在使用 Actors 来处理工作。
这是我的方法:
object QueueConsumer extends Queue {
def consumeMessages = {
setupListener(buildChannel(resultsQueueName), resultsQueueName,
resultsCallback)
}
private def setupListener(receivingChannel: Channel, queue: String,
f: (String) => Any) {
Akka.system.scheduler.scheduleOnce(Duration(10, TimeUnit.SECONDS),
Akka.system.actorOf(Props(new QueueActor(receivingChannel, queue, f))), "")
}
}
class QueueActor(channel:Channel, queue:String, f:(String) => Any) extends Actor{
def receive = {
case _ => startReceiving
}
def startReceiving = {
val consumer = new QueueingConsumer(channel)
channel.basicConsume(queue, false, consumer)
while (true) {
val delivery = consumer.nextDelivery()
val msg = new String(delivery.getBody())
context.actorOf(Props(new Actor {
def receive = {
case some: String => f(some)
}
})) ! msg
channel.basicAck(delivery.getEnvelope.getDeliveryTag, false)
}
}
}
几秒后 运行,它抛出 java.lang.OutOfMemoryError:超出 GC 开销限制。
我认为它正在发生,因为我正在为收到的每条消息启动一个新的 Actor - 所以如果我有 100000 条消息,它将创建 100000 个 actor。这是一个好方法还是我应该实施类似 'actors pool' 的方法?
任何人都知道如何在我的场景中避免 OutOfMemoryError?
提前致谢。
编辑1:
将方法更改为:
class Queue2(json:String) extends Actor {
def receive = {
case x: String =>
val envelope = MessageEnvelopeParser.toObject(x)
val processor = ProcessQueueServiceFactory.getProcessResultsService()
envelope.messages.foreach(message => processor.process(message))
}
}
object Queue2 {
def props(json: String): Props = Props(new Queue2(json))
}
class QueueActor(channel:Channel, queue:String) extends Actor {
def receive = {
case _ => startReceiving
}
def startReceiving = {
val consumer = new QueueingConsumer(channel)
channel.basicConsume(queue, false, consumer)
while (true) {
val delivery = consumer.nextDelivery()
val msg = new String(delivery.getBody())
context.actorOf(Queue2.props(msg))
channel.basicAck(delivery.getEnvelope.getDeliveryTag, false)
}
}
}
您的每条消息 actor 需要在完成后自行停止,否则它们将永远存在。请参阅 Actor lifecycle and stopping Actors 上的文档。这里处理完成后只需要添加context.stop(self)
即可。
我有一个使用来自 RabbitMQ 的消息的应用程序,我正在使用 Actors 来处理工作。
这是我的方法:
object QueueConsumer extends Queue {
def consumeMessages = {
setupListener(buildChannel(resultsQueueName), resultsQueueName,
resultsCallback)
}
private def setupListener(receivingChannel: Channel, queue: String,
f: (String) => Any) {
Akka.system.scheduler.scheduleOnce(Duration(10, TimeUnit.SECONDS),
Akka.system.actorOf(Props(new QueueActor(receivingChannel, queue, f))), "")
}
}
class QueueActor(channel:Channel, queue:String, f:(String) => Any) extends Actor{
def receive = {
case _ => startReceiving
}
def startReceiving = {
val consumer = new QueueingConsumer(channel)
channel.basicConsume(queue, false, consumer)
while (true) {
val delivery = consumer.nextDelivery()
val msg = new String(delivery.getBody())
context.actorOf(Props(new Actor {
def receive = {
case some: String => f(some)
}
})) ! msg
channel.basicAck(delivery.getEnvelope.getDeliveryTag, false)
}
}
}
几秒后 运行,它抛出 java.lang.OutOfMemoryError:超出 GC 开销限制。
我认为它正在发生,因为我正在为收到的每条消息启动一个新的 Actor - 所以如果我有 100000 条消息,它将创建 100000 个 actor。这是一个好方法还是我应该实施类似 'actors pool' 的方法?
任何人都知道如何在我的场景中避免 OutOfMemoryError?
提前致谢。
编辑1:
将方法更改为:
class Queue2(json:String) extends Actor {
def receive = {
case x: String =>
val envelope = MessageEnvelopeParser.toObject(x)
val processor = ProcessQueueServiceFactory.getProcessResultsService()
envelope.messages.foreach(message => processor.process(message))
}
}
object Queue2 {
def props(json: String): Props = Props(new Queue2(json))
}
class QueueActor(channel:Channel, queue:String) extends Actor {
def receive = {
case _ => startReceiving
}
def startReceiving = {
val consumer = new QueueingConsumer(channel)
channel.basicConsume(queue, false, consumer)
while (true) {
val delivery = consumer.nextDelivery()
val msg = new String(delivery.getBody())
context.actorOf(Queue2.props(msg))
channel.basicAck(delivery.getEnvelope.getDeliveryTag, false)
}
}
}
您的每条消息 actor 需要在完成后自行停止,否则它们将永远存在。请参阅 Actor lifecycle and stopping Actors 上的文档。这里处理完成后只需要添加context.stop(self)
即可。