TypedActor 是否可以替代在常规 actor 中执行阻塞调用?

Is TypedActor alternative to executing blocking call in regular actor?

我有一个场景,当 Actor 收到消息时,它会在阻塞调用中发送数据。这可能会成为一个问题,因为有时发送可能会因为我无法控制的原因而阻塞太长时间。这会导致 Actor 的消息框在此阻塞期间填满未决消息,从而导致我的应用程序内存不足。所以我需要一种方法将数据发送变成异步调用

我正在研究的一种解决方案是使用 TypedActor 发即弃方法进行发送,这应该避免 Actor 中的消息队列填满。但我的问题是,对于这种方法,我是否只是把罐头踢到一边,因为类型化的演员现在只会继承问题并填满它的消息队列?以下是可能实现的简化版本:

trait AsyncPipe[D] {        
    def send(data: D): Unit
}

class AsyncPipeImpl[D](fire: D => Unit) extends AsyncPipe[D] {  
    def send(data: D): Unit = fire(data)
}

class BlockingPipe {
    def send(str: String) = { 
        // Does a blocking send
    }
}

class MyActor extends Actor {

    val bp = new BlockingPipe

    val ap: AsyncPipe[String] = TypedActor(context.system).typedActorOf(
            TypedProps(classOf[AsyncPipe[String]], new AsyncPipeImpl(bp.send)))

    def receive = {
        case msg: String =>
            ap.send(msg)
    }
}

首先您需要决定是否要拒绝新的传入消息或超时挂起的请求。

尽管如此,我建议使用 Typed Router Pattern 来隐藏负责 多个请求处理程序 .

的逻辑

如果您想拒绝收到的消息,您应该使用适当的 bounded mailbox config

如果您想要超时请求,您应该在 TypedActor 中使用 Future 并指定超时。

当然你可以结合这两种策略。

阻塞可能根本不应该在 actor 中完成(只有当您需要对可变资源进行某种复杂的访问时才有意义 进行阻塞调用,IMO ) - 你能用一个简单的 Future 解决问题吗?

在任何你排队的地方都有排队的风险(akka 没有很好的背压支持 IME,尽管@hicolour 提供了一些选项)。最好重新安排事情,让您的 actor 拉取工作而不是将其推送给它(或者可能是像 iteratee 模式这样的相互拉动风格),这样您就不必排队。