使用 Akka Actors 的 OutOfMemoryError

OutOfMemoryError using Akka Actors

我有一个使用来自 RabbitMQ 的消息的应用程序,我正在使用 Actors 来处理工作。

这是我的方法:

object QueueConsumer extends Queue {

  def consumeMessages = {
    setupListener(buildChannel(resultsQueueName), resultsQueueName,
        resultsCallback)
  }

  private def setupListener(receivingChannel: Channel, queue: String, 
        f: (String) => Any) {
    Akka.system.scheduler.scheduleOnce(Duration(10, TimeUnit.SECONDS),
      Akka.system.actorOf(Props(new QueueActor(receivingChannel, queue, f))), "")
  }

}

class QueueActor(channel:Channel, queue:String, f:(String) => Any) extends Actor{

  def receive = {
    case _ => startReceiving
  }

  def startReceiving = {
    val consumer = new QueueingConsumer(channel)
    channel.basicConsume(queue, false, consumer)
    while (true) {
      val delivery = consumer.nextDelivery()
      val msg = new String(delivery.getBody())
      context.actorOf(Props(new Actor {
    def receive = {
      case some: String => f(some)
    }
      })) ! msg
      channel.basicAck(delivery.getEnvelope.getDeliveryTag, false)
    }
  }

}

几秒后 运行,它抛出 java.lang.OutOfMemoryError:超出 GC 开销限制

我认为它正在发生,因为我正在为收到的每条消息启动一个新的 Actor - 所以如果我有 100000 条消息,它将创建 100000 个 actor。这是一个好方法还是我应该实施类似 'actors pool' 的方法?

任何人都知道如何在我的场景中避免 OutOfMemoryError?

提前致谢。

编辑1:

将方法更改为:

class Queue2(json:String) extends Actor {

  def receive = {
    case x: String =>
      val envelope = MessageEnvelopeParser.toObject(x)
      val processor = ProcessQueueServiceFactory.getProcessResultsService()
      envelope.messages.foreach(message => processor.process(message))
  }

}

object Queue2 {
  def props(json: String): Props = Props(new Queue2(json))
}

class QueueActor(channel:Channel, queue:String) extends Actor {

  def receive = {
    case _ => startReceiving
  }

  def startReceiving = {
    val consumer = new QueueingConsumer(channel)
    channel.basicConsume(queue, false, consumer)
    while (true) {
      val delivery = consumer.nextDelivery()
      val msg = new String(delivery.getBody())
      context.actorOf(Queue2.props(msg))
      channel.basicAck(delivery.getEnvelope.getDeliveryTag, false)
    }
  }
}

您的每条消息 actor 需要在完成后自行停止,否则它们将永远存在。请参阅 Actor lifecycle and stopping Actors 上的文档。这里处理完成后只需要添加context.stop(self)即可。